]> Sergey Matveev's repositories - mmc.git/blobdiff - cmd/mmc/main.go
Simpler combined statuses update and heartbeating
[mmc.git] / cmd / mmc / main.go
index 20c8da45cc6426f3216834888d1ab655f049686d..159674709388922f2030b490877657ed2e744991 100644 (file)
@@ -30,7 +30,9 @@ import (
        "os/exec"
        "os/signal"
        "path"
+       "sort"
        "strings"
+       "sync"
        "syscall"
        "time"
 
@@ -42,10 +44,9 @@ import (
 const CmdFile = "/FILE "
 
 var (
-       Newwin    = flag.String("newwin", "cmd/newwin", "Path to newwin command")
-       DebugFifo = flag.String("debug", "debug", "Path to debug FIFO to be created")
-       DebugFd   *os.File
-       UmaskCur  int
+       Newwin   = flag.String("newwin", "cmd/newwin", "Path to newwin command")
+       DebugFd  *os.File
+       UmaskCur int
 )
 
 func rewriteIfChanged(fn string, data string) {
@@ -58,7 +59,7 @@ func rewriteIfChanged(fn string, data string) {
 }
 
 func main() {
-       entrypoint := flag.String("entrypoint", "mm.rnd.stcnet.ru", "Entrypoint")
+       entrypoint := flag.String("entrypoint", mmc.GetEntrypoint(), "Entrypoint")
        notifyCmd := flag.String("notify", "cmd/notify", "Path to notification handler")
        heartbeatCh := flag.String("heartbeat-ch", "town-square", "Channel for heartbeating")
        flag.Parse()
@@ -67,16 +68,18 @@ func main() {
        UmaskCur = syscall.Umask(0)
        syscall.Umask(UmaskCur)
 
-       var err error
-       if *DebugFifo != "" {
-               DebugFd, err = os.OpenFile(
-                       *DebugFifo, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
-               )
-               if err != nil {
-                       log.Fatalln(err)
-               }
-               defer DebugFd.Close()
+       os.Remove("debug")
+       err := syscall.Mkfifo("debug", 0666)
+       if err != nil {
+               log.Fatalln(err)
        }
+       DebugFd, err = os.OpenFile(
+               "debug", os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
+       )
+       if err != nil {
+               log.Fatalln(err)
+       }
+       defer DebugFd.Close()
 
        login, password := mmc.FindInNetrc(*entrypoint)
        if login == "" || password == "" {
@@ -110,104 +113,6 @@ func main() {
        Team := teams[0]
 
        var updateQueue []string
-       LastSent := time.Now()
-
-       Users, err := mmc.GetUsers(c, DebugFd)
-       if err != nil {
-               log.Fatalln(err)
-       }
-       for _, u := range Users {
-               pth := path.Join("users", strings.ReplaceAll(u.Username, ".", "_"))
-               os.MkdirAll(pth, 0777)
-               rewriteIfChanged(
-                       path.Join(pth, "name"),
-                       fmt.Sprintf("%s %s\n", u.FirstName, u.LastName),
-               )
-               rewriteIfChanged(path.Join(pth, "email"), u.Email+"\n")
-               rewriteIfChanged(path.Join(pth, "id"), u.Id+"\n")
-               if _, err := os.Stat(path.Join(pth, mmc.OutRec)); err != nil &&
-                       errors.Is(err, fs.ErrNotExist) {
-                       if _, err = os.OpenFile(
-                               path.Join(pth, mmc.OutRec), os.O_WRONLY|os.O_CREATE, 0o666,
-                       ); err != nil {
-                               log.Fatalln(err)
-                       }
-               }
-               updateQueue = append(updateQueue,
-                       pth, u.Id+"__"+me.Id,
-                       pth, me.Id+"__"+u.Id,
-               )
-
-               statusPth := path.Join(pth, "status")
-               os.Remove(statusPth)
-               if err := syscall.Mkfifo(statusPth, 0666); err != nil {
-                       log.Fatalln(err)
-               }
-               go func(u *model.User) {
-                       for {
-                               time.Sleep(mmc.SleepTime)
-                               fd, err := os.OpenFile(
-                                       statusPth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
-                               )
-                               if err != nil {
-                                       log.Println("OpenFile:", statusPth, err)
-                                       continue
-                               }
-                               status, resp, err := c.GetUserStatus(u.Id, "")
-                               if err != nil {
-                                       if DebugFd != nil {
-                                               spew.Fdump(DebugFd, resp)
-                                       }
-                                       log.Println("GetUserStatus:", err)
-                                       fd.Close()
-                                       continue
-                               }
-                               if DebugFd != nil {
-                                       spew.Fdump(DebugFd, status)
-                               }
-                               fmt.Fprintf(fd, "%s\n", status.Status)
-                               fd.Close()
-                       }
-               }(u)
-
-               pth = path.Join(pth, "in")
-               os.Remove(pth)
-               if err := syscall.Mkfifo(pth, 0666); err != nil {
-                       log.Fatalln(err)
-               }
-               go func(u *model.User) {
-                       var dc *model.Channel
-                       for {
-                               fd, err := os.OpenFile(pth, os.O_RDONLY, os.FileMode(0666))
-                               if err != nil {
-                                       continue
-                               }
-                               data, err := io.ReadAll(fd)
-                               fd.Close()
-                               if err != nil {
-                                       continue
-                               }
-                               if dc == nil {
-                                       dc, resp, err = c.CreateDirectChannel(me.Id, u.Id)
-                                       if err != nil {
-                                               if DebugFd != nil {
-                                                       spew.Fdump(DebugFd, resp)
-                                               }
-                                               log.Println("CreateDirectChannel:", err)
-                                               continue
-                                       }
-                                       if DebugFd != nil {
-                                               spew.Fdump(DebugFd, dc)
-                                       }
-                               }
-                               if _, err = makePost(c, dc.Id, string(data)); err != nil {
-                                       log.Println("makePost:", err)
-                               }
-                               LastSent = time.Now()
-                       }
-               }(u)
-       }
-
        Chans := make(map[string]*model.Channel)
        time.Sleep(mmc.SleepTime)
        page, resp, err := c.GetChannelsForTeamForUser(Team.Id, me.Id, false, "")
@@ -302,23 +207,164 @@ func main() {
                                if _, err = makePost(c, ch.Id, string(data)); err != nil {
                                        log.Println("makePost:", err)
                                }
-                               LastSent = time.Now()
                        }
                }(ch)
        }
 
+       Users, err := mmc.GetUsers(c, DebugFd)
+       if err != nil {
+               log.Fatalln(err)
+       }
+
+       UsersDC := make(map[string]*model.Channel, len(Users))
+       for _, u := range Users {
+               pth := path.Join("users", strings.ReplaceAll(u.Username, ".", "_"))
+               os.MkdirAll(pth, 0777)
+               rewriteIfChanged(
+                       path.Join(pth, "name"),
+                       fmt.Sprintf("%s %s\n", u.FirstName, u.LastName),
+               )
+               rewriteIfChanged(path.Join(pth, "email"), u.Email+"\n")
+               rewriteIfChanged(path.Join(pth, "id"), u.Id+"\n")
+               if _, err := os.Stat(path.Join(pth, mmc.OutRec)); err != nil &&
+                       errors.Is(err, fs.ErrNotExist) {
+                       if _, err = os.OpenFile(
+                               path.Join(pth, mmc.OutRec), os.O_WRONLY|os.O_CREATE, 0o666,
+                       ); err != nil {
+                               log.Fatalln(err)
+                       }
+               }
+
+               if fi, err := os.Stat(path.Join(pth, mmc.OutRec)); err == nil && fi.Size() > 0 {
+                       time.Sleep(mmc.SleepTime)
+                       dc, resp, err := c.CreateDirectChannel(me.Id, u.Id)
+                       if err != nil {
+                               if DebugFd != nil {
+                                       spew.Fdump(DebugFd, resp)
+                               }
+                               log.Println("CreateDirectChannel:", err)
+                               continue
+                       }
+                       if DebugFd != nil {
+                               spew.Fdump(DebugFd, dc)
+                       }
+                       UsersDC[u.Id] = dc
+                       updateQueue = append(updateQueue, pth, dc.Id)
+               }
+
+               statusPth := path.Join(pth, "status")
+               os.Remove(statusPth)
+               if err := syscall.Mkfifo(statusPth, 0666); err != nil {
+                       log.Fatalln(err)
+               }
+               go func(u *model.User) {
+                       for {
+                               time.Sleep(mmc.SleepTime)
+                               fd, err := os.OpenFile(
+                                       statusPth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
+                               )
+                               if err != nil {
+                                       log.Println("OpenFile:", statusPth, err)
+                                       continue
+                               }
+                               status, resp, err := c.GetUserStatus(u.Id, "")
+                               if err != nil {
+                                       if DebugFd != nil {
+                                               spew.Fdump(DebugFd, resp)
+                                       }
+                                       log.Println("GetUserStatus:", err)
+                                       fd.Close()
+                                       continue
+                               }
+                               if DebugFd != nil {
+                                       spew.Fdump(DebugFd, status)
+                               }
+                               fmt.Fprintf(fd, "%s\n", status.Status)
+                               fd.Close()
+                       }
+               }(u)
+
+               pth = path.Join(pth, "in")
+               os.Remove(pth)
+               if err := syscall.Mkfifo(pth, 0666); err != nil {
+                       log.Fatalln(err)
+               }
+               go func(u *model.User) {
+                       var dc *model.Channel
+                       for {
+                               fd, err := os.OpenFile(pth, os.O_RDONLY, os.FileMode(0666))
+                               if err != nil {
+                                       continue
+                               }
+                               data, err := io.ReadAll(fd)
+                               fd.Close()
+                               if err != nil {
+                                       continue
+                               }
+                               dc = UsersDC[u.Id]
+                               if dc == nil {
+                                       dc, resp, err = c.CreateDirectChannel(me.Id, u.Id)
+                                       if err != nil {
+                                               if DebugFd != nil {
+                                                       spew.Fdump(DebugFd, resp)
+                                               }
+                                               log.Println("CreateDirectChannel:", err)
+                                               continue
+                                       }
+                                       UsersDC[u.Id] = dc
+                                       if DebugFd != nil {
+                                               spew.Fdump(DebugFd, dc)
+                                       }
+                               }
+                               if _, err = makePost(c, dc.Id, string(data)); err != nil {
+                                       log.Println("makePost:", err)
+                               }
+                       }
+               }(u)
+       }
+
+       UserStatus := make(map[string]string)
+       var UserStatusM sync.RWMutex
        go func() {
-               log.Println("syncing", len(Chans)+len(Users), "rooms")
-               for len(updateQueue) > 0 {
-                       err := updatePosts(c, Users, updateQueue[0], updateQueue[1])
+               pth := path.Join("users", "status")
+               os.Remove(pth)
+               if err := syscall.Mkfifo(pth, 0666); err != nil {
+                       log.Fatalln(err)
+               }
+               for {
+                       time.Sleep(mmc.SleepTime)
+                       fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666))
                        if err != nil {
-                               log.Println("updatePosts:", err)
+                               log.Println("OpenFile:", pth, err)
+                               continue
                        }
-                       updateQueue = updateQueue[2:]
+                       statuses := make(map[string][]string)
+                       UserStatusM.RLock()
+                       for name, status := range UserStatus {
+                               statuses[status] = append(statuses[status], name)
+                       }
+                       UserStatusM.RUnlock()
+                       for status := range statuses {
+                               sort.Strings(statuses[status])
+                               fmt.Fprintln(fd, status+":", strings.Join(statuses[status], " "))
+                       }
+                       fd.Close()
                }
-               log.Println("sync done")
        }()
 
+       log.Println("syncing", len(updateQueue)/2, "rooms")
+       for len(updateQueue) > 0 {
+               err := updatePosts(c, Users, updateQueue[0], updateQueue[1])
+               if err != nil {
+                       log.Println("updatePosts:", err)
+               }
+               updateQueue = updateQueue[2:]
+       }
+       log.Println("sync done")
+       if *notifyCmd != "" {
+               exec.Command(*notifyCmd, "sync done").Run()
+       }
+
        go func() {
                os.MkdirAll("file", 0777)
                pthGet := path.Join("file", "get")
@@ -410,14 +456,17 @@ func main() {
        }
        go func() {
                wc.Listen()
+               wc.GetStatuses()
                t := time.NewTicker(time.Minute)
                for {
                        select {
                        case <-t.C:
-                               if time.Now().Before(LastSent.Add(time.Minute)) {
-                                       continue
+                               if wc.ListenError != nil {
+                                       log.Println("ListenError:", wc.ListenError)
+                                       needsShutdown <- syscall.SIGTERM
+                                       return
                                }
-                               wc.SendMessage("ping", nil)
+                               wc.GetStatuses()
                                if *heartbeatCh != "" {
                                        if _, _, err = c.ViewChannel(
                                                me.Id,
@@ -429,14 +478,15 @@ func main() {
                        case <-wc.PingTimeoutChannel:
                                log.Println("PING timeout")
                                needsShutdown <- syscall.SIGTERM
+                               return
                        case e := <-wc.EventChannel:
                                if e == nil || !e.IsValid() {
                                        continue
                                }
-                               data := e.GetData()
                                if DebugFd != nil {
-                                       spew.Fdump(DebugFd, e.EventType(), data)
+                                       spew.Fdump(DebugFd, e)
                                }
+                               data := e.GetData()
                                var user *model.User
                                if userId, ok := data["user_id"]; ok && userId.(string) != "" {
                                        user = Users[userId.(string)]
@@ -453,7 +503,10 @@ func main() {
                                case model.WebsocketEventPostEdited,
                                        model.WebsocketEventPostDeleted,
                                        model.WebsocketEventPosted:
-                                       chName := data["channel_name"].(string)
+                                       chName, ok := data["channel_name"].(string)
+                                       if !ok {
+                                               continue
+                                       }
                                        var post model.Post
                                        if err = json.NewDecoder(
                                                strings.NewReader(data["post"].(string)),
@@ -484,11 +537,13 @@ func main() {
                                                log.Fatalln(err)
                                        }
                                case model.WebsocketEventStatusChange:
-                                       log.Println("status change:", user.Username, "->", data["status"].(string))
+                                       status := data["status"].(string)
+                                       UserStatusM.Lock()
+                                       UserStatus[user.Username] = status
+                                       UserStatusM.Unlock()
                                        if *notifyCmd != "" {
                                                exec.Command(*notifyCmd, fmt.Sprintf(
-                                                       "status: %s -> %s",
-                                                       user.Username, data["status"].(string),
+                                                       "status: %s -> %s", user.Username, status,
                                                )).Run()
                                        }
                                case model.WebsocketEventHello:
@@ -503,8 +558,27 @@ func main() {
                                if DebugFd != nil {
                                        spew.Fdump(DebugFd, resp)
                                }
-                               if text, ok := resp.Data["text"].(string); ok && text == "pong" {
-                                       LastSent = time.Now()
+                               statuses := make(map[string]string)
+                               for userId, status := range resp.Data {
+                                       status, ok := status.(string)
+                                       if !ok {
+                                               continue
+                                       }
+                                       user := Users[userId]
+                                       if user == nil {
+                                               continue
+                                       }
+                                       statuses[user.Username] = status
+                               }
+                               if len(statuses) > 0 {
+                                       UserStatusM.Lock()
+                                       for u := range UserStatus {
+                                               delete(UserStatus, u)
+                                       }
+                                       for u, status := range statuses {
+                                               UserStatus[u] = status
+                                       }
+                                       UserStatusM.Unlock()
                                }
                        }
                }