]> Sergey Matveev's repositories - mmc.git/commitdiff
Simpler combined statuses update and heartbeating
authorSergey Matveev <stargrave@stargrave.org>
Mon, 13 Mar 2023 15:30:25 +0000 (18:30 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Mon, 13 Mar 2023 16:05:44 +0000 (19:05 +0300)
cmd/mmc/main.go
cmd/start

index c383cff2c1c92f0ea645440989a8bb8b850316f6..159674709388922f2030b490877657ed2e744991 100644 (file)
@@ -32,6 +32,7 @@ import (
        "path"
        "sort"
        "strings"
+       "sync"
        "syscall"
        "time"
 
@@ -43,10 +44,9 @@ import (
 const CmdFile = "/FILE "
 
 var (
-       Newwin    = flag.String("newwin", "cmd/newwin", "Path to newwin command")
-       DebugFifo = flag.String("debug", "", "Path to debug FIFO to be created")
-       DebugFd   *os.File
-       UmaskCur  int
+       Newwin   = flag.String("newwin", "cmd/newwin", "Path to newwin command")
+       DebugFd  *os.File
+       UmaskCur int
 )
 
 func rewriteIfChanged(fn string, data string) {
@@ -62,23 +62,24 @@ func main() {
        entrypoint := flag.String("entrypoint", mmc.GetEntrypoint(), "Entrypoint")
        notifyCmd := flag.String("notify", "cmd/notify", "Path to notification handler")
        heartbeatCh := flag.String("heartbeat-ch", "town-square", "Channel for heartbeating")
-       userStatusFifo := flag.String("user-status", "", "Path to FIFO for user statuses")
        flag.Parse()
        log.SetFlags(log.Lshortfile)
        log.SetOutput(os.Stdout)
        UmaskCur = syscall.Umask(0)
        syscall.Umask(UmaskCur)
 
-       var err error
-       if *DebugFifo != "" {
-               DebugFd, err = os.OpenFile(
-                       *DebugFifo, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
-               )
-               if err != nil {
-                       log.Fatalln(err)
-               }
-               defer DebugFd.Close()
+       os.Remove("debug")
+       err := syscall.Mkfifo("debug", 0666)
+       if err != nil {
+               log.Fatalln(err)
        }
+       DebugFd, err = os.OpenFile(
+               "debug", os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
+       )
+       if err != nil {
+               log.Fatalln(err)
+       }
+       defer DebugFd.Close()
 
        login, password := mmc.FindInNetrc(*entrypoint)
        if login == "" || password == "" {
@@ -112,8 +113,6 @@ func main() {
        Team := teams[0]
 
        var updateQueue []string
-       LastSent := time.Now()
-
        Chans := make(map[string]*model.Channel)
        time.Sleep(mmc.SleepTime)
        page, resp, err := c.GetChannelsForTeamForUser(Team.Id, me.Id, false, "")
@@ -208,7 +207,6 @@ func main() {
                                if _, err = makePost(c, ch.Id, string(data)); err != nil {
                                        log.Println("makePost:", err)
                                }
-                               LastSent = time.Now()
                        }
                }(ch)
        }
@@ -219,9 +217,7 @@ func main() {
        }
 
        UsersDC := make(map[string]*model.Channel, len(Users))
-       userIds := make([]string, 0, len(Users))
        for _, u := range Users {
-               userIds = append(userIds, u.Id)
                pth := path.Join("users", strings.ReplaceAll(u.Username, ".", "_"))
                os.MkdirAll(pth, 0777)
                rewriteIfChanged(
@@ -323,54 +319,38 @@ func main() {
                                if _, err = makePost(c, dc.Id, string(data)); err != nil {
                                        log.Println("makePost:", err)
                                }
-                               LastSent = time.Now()
                        }
                }(u)
        }
 
        UserStatus := make(map[string]string)
-       updateUserStatus := func() {
-               statuses, resp, err := c.GetUsersStatusesByIds(userIds)
-               if err != nil {
-                       if DebugFd != nil {
-                               spew.Fdump(DebugFd, resp)
-                       }
+       var UserStatusM sync.RWMutex
+       go func() {
+               pth := path.Join("users", "status")
+               os.Remove(pth)
+               if err := syscall.Mkfifo(pth, 0666); err != nil {
                        log.Fatalln(err)
                }
-               if DebugFd != nil {
-                       spew.Fdump(DebugFd, teams)
-               }
-               for _, s := range statuses {
-                       UserStatus[Users[s.UserId].Username] = s.Status
-               }
-       }
-       if *userStatusFifo != "" {
-               updateUserStatus()
-               go func() {
-                       for {
-                               time.Sleep(mmc.SleepTime)
-                               fd, err := os.OpenFile(
-                                       *userStatusFifo, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
-                               )
-                               if err != nil {
-                                       log.Println("OpenFile:", *userStatusFifo, err)
-                                       continue
-                               }
-                               statuses := make(map[string][]string)
-                               for name, status := range UserStatus {
-                                       statuses[status] = append(statuses[status], name)
-                               }
-                               for status := range statuses {
-                                       if status == "offline" {
-                                               continue
-                                       }
-                                       sort.Strings(statuses[status])
-                                       fmt.Fprintln(fd, status+":", strings.Join(statuses[status], " "))
-                               }
-                               fd.Close()
+               for {
+                       time.Sleep(mmc.SleepTime)
+                       fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666))
+                       if err != nil {
+                               log.Println("OpenFile:", pth, err)
+                               continue
                        }
-               }()
-       }
+                       statuses := make(map[string][]string)
+                       UserStatusM.RLock()
+                       for name, status := range UserStatus {
+                               statuses[status] = append(statuses[status], name)
+                       }
+                       UserStatusM.RUnlock()
+                       for status := range statuses {
+                               sort.Strings(statuses[status])
+                               fmt.Fprintln(fd, status+":", strings.Join(statuses[status], " "))
+                       }
+                       fd.Close()
+               }
+       }()
 
        log.Println("syncing", len(updateQueue)/2, "rooms")
        for len(updateQueue) > 0 {
@@ -476,15 +456,17 @@ func main() {
        }
        go func() {
                wc.Listen()
+               wc.GetStatuses()
                t := time.NewTicker(time.Minute)
                for {
                        select {
                        case <-t.C:
-                               updateUserStatus()
-                               if time.Now().Before(LastSent.Add(time.Minute)) {
-                                       continue
+                               if wc.ListenError != nil {
+                                       log.Println("ListenError:", wc.ListenError)
+                                       needsShutdown <- syscall.SIGTERM
+                                       return
                                }
-                               wc.SendMessage("ping", nil)
+                               wc.GetStatuses()
                                if *heartbeatCh != "" {
                                        if _, _, err = c.ViewChannel(
                                                me.Id,
@@ -496,6 +478,7 @@ func main() {
                        case <-wc.PingTimeoutChannel:
                                log.Println("PING timeout")
                                needsShutdown <- syscall.SIGTERM
+                               return
                        case e := <-wc.EventChannel:
                                if e == nil || !e.IsValid() {
                                        continue
@@ -555,7 +538,9 @@ func main() {
                                        }
                                case model.WebsocketEventStatusChange:
                                        status := data["status"].(string)
+                                       UserStatusM.Lock()
                                        UserStatus[user.Username] = status
+                                       UserStatusM.Unlock()
                                        if *notifyCmd != "" {
                                                exec.Command(*notifyCmd, fmt.Sprintf(
                                                        "status: %s -> %s", user.Username, status,
@@ -573,8 +558,27 @@ func main() {
                                if DebugFd != nil {
                                        spew.Fdump(DebugFd, resp)
                                }
-                               if text, ok := resp.Data["text"].(string); ok && text == "pong" {
-                                       LastSent = time.Now()
+                               statuses := make(map[string]string)
+                               for userId, status := range resp.Data {
+                                       status, ok := status.(string)
+                                       if !ok {
+                                               continue
+                                       }
+                                       user := Users[userId]
+                                       if user == nil {
+                                               continue
+                                       }
+                                       statuses[user.Username] = status
+                               }
+                               if len(statuses) > 0 {
+                                       UserStatusM.Lock()
+                                       for u := range UserStatus {
+                                               delete(UserStatus, u)
+                                       }
+                                       for u, status := range statuses {
+                                               UserStatus[u] = status
+                                       }
+                                       UserStatusM.Unlock()
                                }
                        }
                }
index 2de7122ad17cfe8cc4f563bfc6760e3f6b104f59..64648f0abcb6aa0df1fb8e4aa589a06425db47da 100755 (executable)
--- a/cmd/start
+++ b/cmd/start
@@ -4,14 +4,12 @@ cmd="$(dirname "$(realpath -- "$0")")"
 unset TMUX
 TMUX="tmux -S tmux.sock"
 $TMUX has-session -t mmc 2>/dev/null && exit
-mkdir -p users
-rm -f debug users/status
-mkfifo debug users/status
+find . -type p -delete
 [ -s tmux.conf ] || sed \
     -e "s#NEWWIN#$cmd/newwin#" \
     -e "s#CATFILE#$cmd/catfile#" \
     -e "s#DLPANE#$cmd/dlpane#" < "$cmd"/tmux.conf > tmux.conf
-$TMUX -f tmux.conf new-session -d -n ROOT -s mmc "cat debug | tai64n | tai64nlocal ; read foo"
-$TMUX split-window -h 'while : ; do cat users/status | spc -e grn,"^online:.*" -e cya,"^away:.*" ; sleep 60 ; clear ; done'
-$TMUX split-window -v "while : ; do $cmd/mmc/mmc -debug debug -user-status users/status -newwin $cmd/newwin -notify $cmd/notify | tai64n | tai64nlocal ; printf \"\\a\" ; sleep 1 ; done"
+$TMUX -f tmux.conf new-session -d -n ROOT -s mmc "while : ; do sleep 1 ; cat debug | tai64n | tai64nlocal ; done"
+$TMUX split-window -h 'while : ; do sleep 10 ; clear ; cat users/status | spc -e grn,"^online:.*" -e cya,"^away:.*" ; done'
+$TMUX split-window -v "while : ; do $cmd/mmc/mmc -newwin $cmd/newwin -notify $cmd/notify | tai64n | tai64nlocal ; printf \"\\a\" ; sleep 1 ; done"
 $TMUX attach-session