From: Sergey Matveev Date: Mon, 13 Mar 2023 15:30:25 +0000 (+0300) Subject: Simpler combined statuses update and heartbeating X-Git-Url: http://www.git.stargrave.org/?p=mmc.git;a=commitdiff_plain;h=052e7379d71080bb8e7edfa15f945c82e628acea Simpler combined statuses update and heartbeating --- diff --git a/cmd/mmc/main.go b/cmd/mmc/main.go index c383cff..1596747 100644 --- a/cmd/mmc/main.go +++ b/cmd/mmc/main.go @@ -32,6 +32,7 @@ import ( "path" "sort" "strings" + "sync" "syscall" "time" @@ -43,10 +44,9 @@ import ( const CmdFile = "/FILE " var ( - Newwin = flag.String("newwin", "cmd/newwin", "Path to newwin command") - DebugFifo = flag.String("debug", "", "Path to debug FIFO to be created") - DebugFd *os.File - UmaskCur int + Newwin = flag.String("newwin", "cmd/newwin", "Path to newwin command") + DebugFd *os.File + UmaskCur int ) func rewriteIfChanged(fn string, data string) { @@ -62,23 +62,24 @@ func main() { entrypoint := flag.String("entrypoint", mmc.GetEntrypoint(), "Entrypoint") notifyCmd := flag.String("notify", "cmd/notify", "Path to notification handler") heartbeatCh := flag.String("heartbeat-ch", "town-square", "Channel for heartbeating") - userStatusFifo := flag.String("user-status", "", "Path to FIFO for user statuses") flag.Parse() log.SetFlags(log.Lshortfile) log.SetOutput(os.Stdout) UmaskCur = syscall.Umask(0) syscall.Umask(UmaskCur) - var err error - if *DebugFifo != "" { - DebugFd, err = os.OpenFile( - *DebugFifo, os.O_WRONLY|os.O_APPEND, os.FileMode(0666), - ) - if err != nil { - log.Fatalln(err) - } - defer DebugFd.Close() + os.Remove("debug") + err := syscall.Mkfifo("debug", 0666) + if err != nil { + log.Fatalln(err) } + DebugFd, err = os.OpenFile( + "debug", os.O_WRONLY|os.O_APPEND, os.FileMode(0666), + ) + if err != nil { + log.Fatalln(err) + } + defer DebugFd.Close() login, password := mmc.FindInNetrc(*entrypoint) if login == "" || password == "" { @@ -112,8 +113,6 @@ func main() { Team := teams[0] var updateQueue []string - LastSent := time.Now() - Chans := make(map[string]*model.Channel) time.Sleep(mmc.SleepTime) page, resp, err := c.GetChannelsForTeamForUser(Team.Id, me.Id, false, "") @@ -208,7 +207,6 @@ func main() { if _, err = makePost(c, ch.Id, string(data)); err != nil { log.Println("makePost:", err) } - LastSent = time.Now() } }(ch) } @@ -219,9 +217,7 @@ func main() { } UsersDC := make(map[string]*model.Channel, len(Users)) - userIds := make([]string, 0, len(Users)) for _, u := range Users { - userIds = append(userIds, u.Id) pth := path.Join("users", strings.ReplaceAll(u.Username, ".", "_")) os.MkdirAll(pth, 0777) rewriteIfChanged( @@ -323,54 +319,38 @@ func main() { if _, err = makePost(c, dc.Id, string(data)); err != nil { log.Println("makePost:", err) } - LastSent = time.Now() } }(u) } UserStatus := make(map[string]string) - updateUserStatus := func() { - statuses, resp, err := c.GetUsersStatusesByIds(userIds) - if err != nil { - if DebugFd != nil { - spew.Fdump(DebugFd, resp) - } + var UserStatusM sync.RWMutex + go func() { + pth := path.Join("users", "status") + os.Remove(pth) + if err := syscall.Mkfifo(pth, 0666); err != nil { log.Fatalln(err) } - if DebugFd != nil { - spew.Fdump(DebugFd, teams) - } - for _, s := range statuses { - UserStatus[Users[s.UserId].Username] = s.Status - } - } - if *userStatusFifo != "" { - updateUserStatus() - go func() { - for { - time.Sleep(mmc.SleepTime) - fd, err := os.OpenFile( - *userStatusFifo, os.O_WRONLY|os.O_APPEND, os.FileMode(0666), - ) - if err != nil { - log.Println("OpenFile:", *userStatusFifo, err) - continue - } - statuses := make(map[string][]string) - for name, status := range UserStatus { - statuses[status] = append(statuses[status], name) - } - for status := range statuses { - if status == "offline" { - continue - } - sort.Strings(statuses[status]) - fmt.Fprintln(fd, status+":", strings.Join(statuses[status], " ")) - } - fd.Close() + for { + time.Sleep(mmc.SleepTime) + fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666)) + if err != nil { + log.Println("OpenFile:", pth, err) + continue } - }() - } + statuses := make(map[string][]string) + UserStatusM.RLock() + for name, status := range UserStatus { + statuses[status] = append(statuses[status], name) + } + UserStatusM.RUnlock() + for status := range statuses { + sort.Strings(statuses[status]) + fmt.Fprintln(fd, status+":", strings.Join(statuses[status], " ")) + } + fd.Close() + } + }() log.Println("syncing", len(updateQueue)/2, "rooms") for len(updateQueue) > 0 { @@ -476,15 +456,17 @@ func main() { } go func() { wc.Listen() + wc.GetStatuses() t := time.NewTicker(time.Minute) for { select { case <-t.C: - updateUserStatus() - if time.Now().Before(LastSent.Add(time.Minute)) { - continue + if wc.ListenError != nil { + log.Println("ListenError:", wc.ListenError) + needsShutdown <- syscall.SIGTERM + return } - wc.SendMessage("ping", nil) + wc.GetStatuses() if *heartbeatCh != "" { if _, _, err = c.ViewChannel( me.Id, @@ -496,6 +478,7 @@ func main() { case <-wc.PingTimeoutChannel: log.Println("PING timeout") needsShutdown <- syscall.SIGTERM + return case e := <-wc.EventChannel: if e == nil || !e.IsValid() { continue @@ -555,7 +538,9 @@ func main() { } case model.WebsocketEventStatusChange: status := data["status"].(string) + UserStatusM.Lock() UserStatus[user.Username] = status + UserStatusM.Unlock() if *notifyCmd != "" { exec.Command(*notifyCmd, fmt.Sprintf( "status: %s -> %s", user.Username, status, @@ -573,8 +558,27 @@ func main() { if DebugFd != nil { spew.Fdump(DebugFd, resp) } - if text, ok := resp.Data["text"].(string); ok && text == "pong" { - LastSent = time.Now() + statuses := make(map[string]string) + for userId, status := range resp.Data { + status, ok := status.(string) + if !ok { + continue + } + user := Users[userId] + if user == nil { + continue + } + statuses[user.Username] = status + } + if len(statuses) > 0 { + UserStatusM.Lock() + for u := range UserStatus { + delete(UserStatus, u) + } + for u, status := range statuses { + UserStatus[u] = status + } + UserStatusM.Unlock() } } } diff --git a/cmd/start b/cmd/start index 2de7122..64648f0 100755 --- a/cmd/start +++ b/cmd/start @@ -4,14 +4,12 @@ cmd="$(dirname "$(realpath -- "$0")")" unset TMUX TMUX="tmux -S tmux.sock" $TMUX has-session -t mmc 2>/dev/null && exit -mkdir -p users -rm -f debug users/status -mkfifo debug users/status +find . -type p -delete [ -s tmux.conf ] || sed \ -e "s#NEWWIN#$cmd/newwin#" \ -e "s#CATFILE#$cmd/catfile#" \ -e "s#DLPANE#$cmd/dlpane#" < "$cmd"/tmux.conf > tmux.conf -$TMUX -f tmux.conf new-session -d -n ROOT -s mmc "cat debug | tai64n | tai64nlocal ; read foo" -$TMUX split-window -h 'while : ; do cat users/status | spc -e grn,"^online:.*" -e cya,"^away:.*" ; sleep 60 ; clear ; done' -$TMUX split-window -v "while : ; do $cmd/mmc/mmc -debug debug -user-status users/status -newwin $cmd/newwin -notify $cmd/notify | tai64n | tai64nlocal ; printf \"\\a\" ; sleep 1 ; done" +$TMUX -f tmux.conf new-session -d -n ROOT -s mmc "while : ; do sleep 1 ; cat debug | tai64n | tai64nlocal ; done" +$TMUX split-window -h 'while : ; do sleep 10 ; clear ; cat users/status | spc -e grn,"^online:.*" -e cya,"^away:.*" ; done' +$TMUX split-window -v "while : ; do $cmd/mmc/mmc -newwin $cmd/newwin -notify $cmd/notify | tai64n | tai64nlocal ; printf \"\\a\" ; sleep 1 ; done" $TMUX attach-session