// mmc -- Mattermost client
-// Copyright (C) 2023 Sergey Matveev <stargrave@stargrave.org>
+// Copyright (C) 2023-2024 Sergey Matveev <stargrave@stargrave.org>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
"io"
"io/fs"
"log"
+ "net/url"
"os"
"os/exec"
"os/signal"
"path"
"sort"
"strings"
+ "sync"
"syscall"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/mattermost/mattermost-server/v6/model"
+ "go.cypherpunks.ru/netrc"
"go.stargrave.org/mmc"
)
-const CmdFile = "/FILE "
-
var (
- Newwin = flag.String("newwin", "cmd/newwin", "Path to newwin command")
- DebugFifo = flag.String("debug", "", "Path to debug FIFO to be created")
- DebugFd *os.File
- UmaskCur int
+ Newwin = flag.String("newwin", "cmd/newwin", "Path to newwin command")
+ DebugFd *os.File
+ UmaskCur int
)
func rewriteIfChanged(fn string, data string) {
if their, err := os.ReadFile(fn); err != nil ||
- bytes.Compare([]byte(data), their) != 0 {
+ !bytes.Equal([]byte(data), their) {
if err = os.WriteFile(fn, []byte(data), 0o666); err != nil {
log.Fatalln(err)
}
}
}
+func mkFifo(pth string) {
+ if _, err := os.Stat(pth); err == nil {
+ return
+ }
+ if err := syscall.Mkfifo(pth, 0666); err != nil {
+ log.Fatalln(err)
+ }
+}
+
func main() {
entrypoint := flag.String("entrypoint", mmc.GetEntrypoint(), "Entrypoint")
notifyCmd := flag.String("notify", "cmd/notify", "Path to notification handler")
heartbeatCh := flag.String("heartbeat-ch", "town-square", "Channel for heartbeating")
- userStatusFifo := flag.String("user-status", "", "Path to FIFO for user statuses")
flag.Parse()
log.SetFlags(log.Lshortfile)
log.SetOutput(os.Stdout)
UmaskCur = syscall.Umask(0)
syscall.Umask(UmaskCur)
+ mkFifo("debug")
var err error
- if *DebugFifo != "" {
- DebugFd, err = os.OpenFile(
- *DebugFifo, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
- )
- if err != nil {
- log.Fatalln(err)
- }
- defer DebugFd.Close()
+ DebugFd, err = os.OpenFile(
+ "debug", os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
+ )
+ if err != nil {
+ log.Fatalln(err)
}
+ defer DebugFd.Close()
- login, password := mmc.FindInNetrc(*entrypoint)
+ entrypointURL, err := url.Parse(*entrypoint)
+ if err != nil {
+ log.Fatalln(err)
+ }
+ login, password := netrc.Find(entrypointURL.Hostname())
if login == "" || password == "" {
- log.Fatalln("no credentials found for:", *entrypoint)
+ log.Fatalln("no credentials found for:", entrypointURL.Hostname())
}
- c := model.NewAPIv4Client("https://" + *entrypoint)
+ c := model.NewAPIv4Client(*entrypoint)
c.Login(login, password)
me, resp, err := c.GetMe("")
if err != nil {
Team := teams[0]
var updateQueue []string
- LastSent := time.Now()
+ Chans := make(map[string]*model.Channel)
+ time.Sleep(mmc.SleepTime)
+ page, resp, err := c.GetChannelsForTeamForUser(Team.Id, me.Id, false, "")
+ if err != nil {
+ if DebugFd != nil {
+ spew.Fdump(DebugFd, resp)
+ }
+ log.Fatalln(err)
+ }
+ if DebugFd != nil {
+ spew.Fdump(DebugFd, page)
+ }
+ for _, ch := range page {
+ if ch.Type == "D" {
+ continue
+ }
+ Chans[ch.Name] = ch
+ pth := path.Join("chans", strings.ReplaceAll(ch.Name, ".", "_"))
+ updateQueue = append(updateQueue, pth, ch.Id)
+ os.MkdirAll(pth, 0777)
+ rewriteIfChanged(path.Join(pth, "id"), ch.Id+"\n")
+ rewriteIfChanged(path.Join(pth, "info"), fmt.Sprintf(
+ "%s\n%s\n%s\n",
+ ch.DisplayName,
+ ch.Header,
+ ch.Purpose,
+ ))
+ if _, err := os.Stat(path.Join(pth, mmc.OutRec)); err != nil &&
+ errors.Is(err, fs.ErrNotExist) {
+ if _, err = os.OpenFile(
+ path.Join(pth, mmc.OutRec), os.O_WRONLY|os.O_CREATE, 0o666,
+ ); err != nil {
+ log.Fatalln(err)
+ }
+ }
+
+ usersPth := path.Join(pth, "users")
+ mkFifo(usersPth)
+ go func(ch *model.Channel) {
+ for {
+ time.Sleep(mmc.SleepTime)
+ fd, err := os.OpenFile(
+ usersPth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
+ )
+ if err != nil {
+ log.Println("OpenFile:", usersPth, err)
+ continue
+ }
+ for n := 0; ; n++ {
+ users, resp, err := c.GetUsersInChannel(ch.Id, n, mmc.PerPage, "")
+ if err != nil {
+ if DebugFd != nil {
+ spew.Fdump(DebugFd, resp)
+ }
+ log.Println("GetUsersInChannel:", err)
+ fd.Close()
+ continue
+ }
+ if DebugFd != nil {
+ spew.Fdump(DebugFd, users)
+ }
+ for _, u := range users {
+ fmt.Fprintf(fd, "%s\n", u.Username)
+ }
+ if len(users) < mmc.PerPage {
+ break
+ }
+ }
+ fd.Close()
+ }
+ }(ch)
+
+ pth = path.Join(pth, "in")
+ mkFifo(pth)
+ go func(ch *model.Channel) {
+ for {
+ fd, err := os.OpenFile(pth, os.O_RDONLY, os.FileMode(0666))
+ if err != nil {
+ continue
+ }
+ data, err := io.ReadAll(fd)
+ fd.Close()
+ if err != nil {
+ continue
+ }
+ if _, err = makePost(c, ch.Id, string(data)); err != nil {
+ log.Println("makePost:", err)
+ }
+ }
+ }(ch)
+ }
Users, err := mmc.GetUsers(c, DebugFd)
if err != nil {
}
UsersDC := make(map[string]*model.Channel, len(Users))
- userIds := make([]string, 0, len(Users))
for _, u := range Users {
- userIds = append(userIds, u.Id)
pth := path.Join("users", strings.ReplaceAll(u.Username, ".", "_"))
os.MkdirAll(pth, 0777)
rewriteIfChanged(
}
statusPth := path.Join(pth, "status")
- os.Remove(statusPth)
- if err := syscall.Mkfifo(statusPth, 0666); err != nil {
- log.Fatalln(err)
- }
+ mkFifo(statusPth)
go func(u *model.User) {
for {
time.Sleep(mmc.SleepTime)
}(u)
pth = path.Join(pth, "in")
- os.Remove(pth)
- if err := syscall.Mkfifo(pth, 0666); err != nil {
- log.Fatalln(err)
- }
+ mkFifo(pth)
go func(u *model.User) {
var dc *model.Channel
for {
if _, err = makePost(c, dc.Id, string(data)); err != nil {
log.Println("makePost:", err)
}
- LastSent = time.Now()
}
}(u)
}
UserStatus := make(map[string]string)
- updateUserStatus := func() {
- statuses, resp, err := c.GetUsersStatusesByIds(userIds)
- if err != nil {
- if DebugFd != nil {
- spew.Fdump(DebugFd, resp)
+ var UserStatusM sync.RWMutex
+ go func() {
+ pth := path.Join("users", "status")
+ mkFifo(pth)
+ for {
+ time.Sleep(mmc.SleepTime)
+ fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666))
+ if err != nil {
+ log.Println("OpenFile:", pth, err)
+ continue
}
- log.Fatalln(err)
- }
- if DebugFd != nil {
- spew.Fdump(DebugFd, teams)
- }
- for _, s := range statuses {
- UserStatus[Users[s.UserId].Username] = s.Status
- }
- }
- if *userStatusFifo != "" {
- updateUserStatus()
- go func() {
- for {
- time.Sleep(mmc.SleepTime)
- fd, err := os.OpenFile(
- *userStatusFifo, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
- )
- if err != nil {
- log.Println("OpenFile:", *userStatusFifo, err)
- continue
- }
- statuses := make(map[string][]string)
- for name, status := range UserStatus {
- statuses[status] = append(statuses[status], name)
- }
- for status := range statuses {
- if status == "offline" {
- continue
- }
- sort.Strings(statuses[status])
- fmt.Fprintln(fd, status+":", strings.Join(statuses[status], " "))
- }
- fd.Close()
+ agg := make(map[string][]string)
+ UserStatusM.RLock()
+ for name, status := range UserStatus {
+ agg[status] = append(agg[status], name)
}
- }()
- }
-
- Chans := make(map[string]*model.Channel)
- time.Sleep(mmc.SleepTime)
- page, resp, err := c.GetChannelsForTeamForUser(Team.Id, me.Id, false, "")
- if err != nil {
- if DebugFd != nil {
- spew.Fdump(DebugFd, resp)
- }
- log.Fatalln(err)
- }
- if DebugFd != nil {
- spew.Fdump(DebugFd, page)
- }
- for _, ch := range page {
- if ch.Type == "D" {
- continue
- }
- Chans[ch.Name] = ch
- pth := path.Join("chans", strings.ReplaceAll(ch.Name, ".", "_"))
- updateQueue = append(updateQueue, pth, ch.Id)
- os.MkdirAll(pth, 0777)
- rewriteIfChanged(path.Join(pth, "id"), ch.Id+"\n")
- rewriteIfChanged(path.Join(pth, "info"), fmt.Sprintf(
- "%s\n%s\n%s\n",
- ch.DisplayName,
- ch.Header,
- ch.Purpose,
- ))
- if _, err := os.Stat(path.Join(pth, mmc.OutRec)); err != nil &&
- errors.Is(err, fs.ErrNotExist) {
- if _, err = os.OpenFile(
- path.Join(pth, mmc.OutRec), os.O_WRONLY|os.O_CREATE, 0o666,
- ); err != nil {
- log.Fatalln(err)
+ UserStatusM.RUnlock()
+ statuses := make([]string, 0, len(agg))
+ for status := range agg {
+ sort.Strings(agg[status])
+ statuses = append(statuses, status)
}
- }
-
- usersPth := path.Join(pth, "users")
- os.Remove(usersPth)
- if err := syscall.Mkfifo(usersPth, 0666); err != nil {
- log.Fatalln(err)
- }
- go func(ch *model.Channel) {
- for {
- time.Sleep(mmc.SleepTime)
- fd, err := os.OpenFile(
- usersPth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
- )
- if err != nil {
- log.Println("OpenFile:", usersPth, err)
- continue
- }
- for n := 0; ; n++ {
- users, resp, err := c.GetUsersInChannel(ch.Id, n, mmc.PerPage, "")
- if err != nil {
- if DebugFd != nil {
- spew.Fdump(DebugFd, resp)
- }
- log.Println("GetUsersInChannel:", err)
- fd.Close()
- continue
- }
- if DebugFd != nil {
- spew.Fdump(DebugFd, users)
- }
- for _, u := range users {
- fmt.Fprintf(fd, "%s\n", u.Username)
- }
- if len(users) < mmc.PerPage {
- break
- }
- }
- fd.Close()
+ sort.Strings(statuses)
+ for _, status := range statuses {
+ fmt.Fprintln(fd, status+":", strings.Join(agg[status], " "))
}
- }(ch)
-
- pth = path.Join(pth, "in")
- os.Remove(pth)
- if err := syscall.Mkfifo(pth, 0666); err != nil {
- log.Fatalln(err)
+ fd.Close()
}
- go func(ch *model.Channel) {
- for {
- fd, err := os.OpenFile(pth, os.O_RDONLY, os.FileMode(0666))
- if err != nil {
- continue
- }
- data, err := io.ReadAll(fd)
- fd.Close()
- if err != nil {
- continue
- }
- if _, err = makePost(c, ch.Id, string(data)); err != nil {
- log.Println("makePost:", err)
- }
- LastSent = time.Now()
- }
- }(ch)
- }
+ }()
log.Println("syncing", len(updateQueue)/2, "rooms")
for len(updateQueue) > 0 {
go func() {
os.MkdirAll("file", 0777)
pthGet := path.Join("file", "get")
- os.Remove(pthGet)
- if err := syscall.Mkfifo(pthGet, 0666); err != nil {
- log.Fatalln(err)
- }
+ mkFifo(pthGet)
pthOut := path.Join("file", "out")
- os.Remove(pthOut)
- if err := syscall.Mkfifo(pthOut, 0666); err != nil {
- log.Fatalln(err)
- }
+ mkFifo(pthOut)
for {
time.Sleep(mmc.SleepTime)
fd, err := os.OpenFile(pthGet, os.O_RDONLY, os.FileMode(0666))
}()
needsShutdown := make(chan os.Signal)
- wc, err := model.NewWebSocketClient4("wss://"+*entrypoint, c.AuthToken)
+ switch entrypointURL.Scheme {
+ case "http":
+ entrypointURL.Scheme = "ws"
+ case "https":
+ entrypointURL.Scheme = "wss"
+ default:
+ log.Println("unhandled scheme:", entrypointURL.Scheme)
+ }
+ wc, err := model.NewWebSocketClient4(entrypointURL.String(), c.AuthToken)
if err != nil {
log.Fatalln(err)
}
go func() {
wc.Listen()
+ wc.GetStatuses()
t := time.NewTicker(time.Minute)
for {
select {
case <-t.C:
- updateUserStatus()
- if time.Now().Before(LastSent.Add(time.Minute)) {
- continue
+ if wc.ListenError != nil {
+ log.Println("ListenError:", wc.ListenError)
+ needsShutdown <- syscall.SIGTERM
+ return
}
- wc.SendMessage("ping", nil)
+ wc.GetStatuses()
if *heartbeatCh != "" {
if _, _, err = c.ViewChannel(
me.Id,
case <-wc.PingTimeoutChannel:
log.Println("PING timeout")
needsShutdown <- syscall.SIGTERM
+ return
case e := <-wc.EventChannel:
if e == nil || !e.IsValid() {
continue
userId = strings.TrimSuffix(userId, "__"+me.Id)
user := Users[userId]
if user == nil {
- log.Println("unknown user:", post)
+ log.Println("unknown user:", userId)
continue
}
recipient = path.Join("users", user.Username)
}
case model.WebsocketEventStatusChange:
status := data["status"].(string)
+ UserStatusM.Lock()
UserStatus[user.Username] = status
+ UserStatusM.Unlock()
if *notifyCmd != "" {
exec.Command(*notifyCmd, fmt.Sprintf(
"status: %s -> %s", user.Username, status,
if DebugFd != nil {
spew.Fdump(DebugFd, resp)
}
- if text, ok := resp.Data["text"].(string); ok && text == "pong" {
- LastSent = time.Now()
+ statuses := make(map[string]string)
+ for userId, status := range resp.Data {
+ status, ok := status.(string)
+ if !ok {
+ continue
+ }
+ user := Users[userId]
+ if user == nil {
+ continue
+ }
+ statuses[user.Username] = status
+ }
+ if len(statuses) > 0 {
+ UserStatusM.Lock()
+ for u := range UserStatus {
+ delete(UserStatus, u)
+ }
+ for u, status := range statuses {
+ UserStatus[u] = status
+ }
+ UserStatusM.Unlock()
}
}
}