]> Sergey Matveev's repositories - mmc.git/blob - cmd/mmc/main.go
Simpler combined statuses update and heartbeating
[mmc.git] / cmd / mmc / main.go
1 // mmc -- Mattermost client
2 // Copyright (C) 2023 Sergey Matveev <stargrave@stargrave.org>
3 //
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU Affero General Public License as
6 // published by the Free Software Foundation, either version 3 of the
7 // License.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU Affero General Public License for more details.
13 //
14 // You should have received a copy of the GNU Affero General Public License
15 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 package main
18
19 import (
20         "archive/tar"
21         "bytes"
22         "encoding/json"
23         "errors"
24         "flag"
25         "fmt"
26         "io"
27         "io/fs"
28         "log"
29         "os"
30         "os/exec"
31         "os/signal"
32         "path"
33         "sort"
34         "strings"
35         "sync"
36         "syscall"
37         "time"
38
39         "github.com/davecgh/go-spew/spew"
40         "github.com/mattermost/mattermost-server/v6/model"
41         "go.stargrave.org/mmc"
42 )
43
44 const CmdFile = "/FILE "
45
46 var (
47         Newwin   = flag.String("newwin", "cmd/newwin", "Path to newwin command")
48         DebugFd  *os.File
49         UmaskCur int
50 )
51
52 func rewriteIfChanged(fn string, data string) {
53         if their, err := os.ReadFile(fn); err != nil ||
54                 bytes.Compare([]byte(data), their) != 0 {
55                 if err = os.WriteFile(fn, []byte(data), 0o666); err != nil {
56                         log.Fatalln(err)
57                 }
58         }
59 }
60
61 func main() {
62         entrypoint := flag.String("entrypoint", mmc.GetEntrypoint(), "Entrypoint")
63         notifyCmd := flag.String("notify", "cmd/notify", "Path to notification handler")
64         heartbeatCh := flag.String("heartbeat-ch", "town-square", "Channel for heartbeating")
65         flag.Parse()
66         log.SetFlags(log.Lshortfile)
67         log.SetOutput(os.Stdout)
68         UmaskCur = syscall.Umask(0)
69         syscall.Umask(UmaskCur)
70
71         os.Remove("debug")
72         err := syscall.Mkfifo("debug", 0666)
73         if err != nil {
74                 log.Fatalln(err)
75         }
76         DebugFd, err = os.OpenFile(
77                 "debug", os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
78         )
79         if err != nil {
80                 log.Fatalln(err)
81         }
82         defer DebugFd.Close()
83
84         login, password := mmc.FindInNetrc(*entrypoint)
85         if login == "" || password == "" {
86                 log.Fatalln("no credentials found for:", *entrypoint)
87         }
88         c := model.NewAPIv4Client("https://" + *entrypoint)
89         c.Login(login, password)
90         me, resp, err := c.GetMe("")
91         if err != nil {
92                 if DebugFd != nil {
93                         spew.Fdump(DebugFd, resp)
94                 }
95                 log.Fatalln(err)
96         }
97         if DebugFd != nil {
98                 spew.Fdump(DebugFd, me)
99         }
100         log.Println("logged in")
101
102         time.Sleep(mmc.SleepTime)
103         teams, resp, err := c.GetTeamsForUser(me.Id, "")
104         if err != nil {
105                 if DebugFd != nil {
106                         spew.Fdump(DebugFd, resp)
107                 }
108                 log.Fatalln(err)
109         }
110         if DebugFd != nil {
111                 spew.Fdump(DebugFd, teams)
112         }
113         Team := teams[0]
114
115         var updateQueue []string
116         Chans := make(map[string]*model.Channel)
117         time.Sleep(mmc.SleepTime)
118         page, resp, err := c.GetChannelsForTeamForUser(Team.Id, me.Id, false, "")
119         if err != nil {
120                 if DebugFd != nil {
121                         spew.Fdump(DebugFd, resp)
122                 }
123                 log.Fatalln(err)
124         }
125         if DebugFd != nil {
126                 spew.Fdump(DebugFd, page)
127         }
128         for _, ch := range page {
129                 if ch.Type == "D" {
130                         continue
131                 }
132                 Chans[ch.Name] = ch
133                 pth := path.Join("chans", strings.ReplaceAll(ch.Name, ".", "_"))
134                 updateQueue = append(updateQueue, pth, ch.Id)
135                 os.MkdirAll(pth, 0777)
136                 rewriteIfChanged(path.Join(pth, "id"), ch.Id+"\n")
137                 rewriteIfChanged(path.Join(pth, "info"), fmt.Sprintf(
138                         "%s\n%s\n%s\n",
139                         ch.DisplayName,
140                         ch.Header,
141                         ch.Purpose,
142                 ))
143                 if _, err := os.Stat(path.Join(pth, mmc.OutRec)); err != nil &&
144                         errors.Is(err, fs.ErrNotExist) {
145                         if _, err = os.OpenFile(
146                                 path.Join(pth, mmc.OutRec), os.O_WRONLY|os.O_CREATE, 0o666,
147                         ); err != nil {
148                                 log.Fatalln(err)
149                         }
150                 }
151
152                 usersPth := path.Join(pth, "users")
153                 os.Remove(usersPth)
154                 if err := syscall.Mkfifo(usersPth, 0666); err != nil {
155                         log.Fatalln(err)
156                 }
157                 go func(ch *model.Channel) {
158                         for {
159                                 time.Sleep(mmc.SleepTime)
160                                 fd, err := os.OpenFile(
161                                         usersPth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
162                                 )
163                                 if err != nil {
164                                         log.Println("OpenFile:", usersPth, err)
165                                         continue
166                                 }
167                                 for n := 0; ; n++ {
168                                         users, resp, err := c.GetUsersInChannel(ch.Id, n, mmc.PerPage, "")
169                                         if err != nil {
170                                                 if DebugFd != nil {
171                                                         spew.Fdump(DebugFd, resp)
172                                                 }
173                                                 log.Println("GetUsersInChannel:", err)
174                                                 fd.Close()
175                                                 continue
176                                         }
177                                         if DebugFd != nil {
178                                                 spew.Fdump(DebugFd, users)
179                                         }
180                                         for _, u := range users {
181                                                 fmt.Fprintf(fd, "%s\n", u.Username)
182                                         }
183                                         if len(users) < mmc.PerPage {
184                                                 break
185                                         }
186                                 }
187                                 fd.Close()
188                         }
189                 }(ch)
190
191                 pth = path.Join(pth, "in")
192                 os.Remove(pth)
193                 if err := syscall.Mkfifo(pth, 0666); err != nil {
194                         log.Fatalln(err)
195                 }
196                 go func(ch *model.Channel) {
197                         for {
198                                 fd, err := os.OpenFile(pth, os.O_RDONLY, os.FileMode(0666))
199                                 if err != nil {
200                                         continue
201                                 }
202                                 data, err := io.ReadAll(fd)
203                                 fd.Close()
204                                 if err != nil {
205                                         continue
206                                 }
207                                 if _, err = makePost(c, ch.Id, string(data)); err != nil {
208                                         log.Println("makePost:", err)
209                                 }
210                         }
211                 }(ch)
212         }
213
214         Users, err := mmc.GetUsers(c, DebugFd)
215         if err != nil {
216                 log.Fatalln(err)
217         }
218
219         UsersDC := make(map[string]*model.Channel, len(Users))
220         for _, u := range Users {
221                 pth := path.Join("users", strings.ReplaceAll(u.Username, ".", "_"))
222                 os.MkdirAll(pth, 0777)
223                 rewriteIfChanged(
224                         path.Join(pth, "name"),
225                         fmt.Sprintf("%s %s\n", u.FirstName, u.LastName),
226                 )
227                 rewriteIfChanged(path.Join(pth, "email"), u.Email+"\n")
228                 rewriteIfChanged(path.Join(pth, "id"), u.Id+"\n")
229                 if _, err := os.Stat(path.Join(pth, mmc.OutRec)); err != nil &&
230                         errors.Is(err, fs.ErrNotExist) {
231                         if _, err = os.OpenFile(
232                                 path.Join(pth, mmc.OutRec), os.O_WRONLY|os.O_CREATE, 0o666,
233                         ); err != nil {
234                                 log.Fatalln(err)
235                         }
236                 }
237
238                 if fi, err := os.Stat(path.Join(pth, mmc.OutRec)); err == nil && fi.Size() > 0 {
239                         time.Sleep(mmc.SleepTime)
240                         dc, resp, err := c.CreateDirectChannel(me.Id, u.Id)
241                         if err != nil {
242                                 if DebugFd != nil {
243                                         spew.Fdump(DebugFd, resp)
244                                 }
245                                 log.Println("CreateDirectChannel:", err)
246                                 continue
247                         }
248                         if DebugFd != nil {
249                                 spew.Fdump(DebugFd, dc)
250                         }
251                         UsersDC[u.Id] = dc
252                         updateQueue = append(updateQueue, pth, dc.Id)
253                 }
254
255                 statusPth := path.Join(pth, "status")
256                 os.Remove(statusPth)
257                 if err := syscall.Mkfifo(statusPth, 0666); err != nil {
258                         log.Fatalln(err)
259                 }
260                 go func(u *model.User) {
261                         for {
262                                 time.Sleep(mmc.SleepTime)
263                                 fd, err := os.OpenFile(
264                                         statusPth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
265                                 )
266                                 if err != nil {
267                                         log.Println("OpenFile:", statusPth, err)
268                                         continue
269                                 }
270                                 status, resp, err := c.GetUserStatus(u.Id, "")
271                                 if err != nil {
272                                         if DebugFd != nil {
273                                                 spew.Fdump(DebugFd, resp)
274                                         }
275                                         log.Println("GetUserStatus:", err)
276                                         fd.Close()
277                                         continue
278                                 }
279                                 if DebugFd != nil {
280                                         spew.Fdump(DebugFd, status)
281                                 }
282                                 fmt.Fprintf(fd, "%s\n", status.Status)
283                                 fd.Close()
284                         }
285                 }(u)
286
287                 pth = path.Join(pth, "in")
288                 os.Remove(pth)
289                 if err := syscall.Mkfifo(pth, 0666); err != nil {
290                         log.Fatalln(err)
291                 }
292                 go func(u *model.User) {
293                         var dc *model.Channel
294                         for {
295                                 fd, err := os.OpenFile(pth, os.O_RDONLY, os.FileMode(0666))
296                                 if err != nil {
297                                         continue
298                                 }
299                                 data, err := io.ReadAll(fd)
300                                 fd.Close()
301                                 if err != nil {
302                                         continue
303                                 }
304                                 dc = UsersDC[u.Id]
305                                 if dc == nil {
306                                         dc, resp, err = c.CreateDirectChannel(me.Id, u.Id)
307                                         if err != nil {
308                                                 if DebugFd != nil {
309                                                         spew.Fdump(DebugFd, resp)
310                                                 }
311                                                 log.Println("CreateDirectChannel:", err)
312                                                 continue
313                                         }
314                                         UsersDC[u.Id] = dc
315                                         if DebugFd != nil {
316                                                 spew.Fdump(DebugFd, dc)
317                                         }
318                                 }
319                                 if _, err = makePost(c, dc.Id, string(data)); err != nil {
320                                         log.Println("makePost:", err)
321                                 }
322                         }
323                 }(u)
324         }
325
326         UserStatus := make(map[string]string)
327         var UserStatusM sync.RWMutex
328         go func() {
329                 pth := path.Join("users", "status")
330                 os.Remove(pth)
331                 if err := syscall.Mkfifo(pth, 0666); err != nil {
332                         log.Fatalln(err)
333                 }
334                 for {
335                         time.Sleep(mmc.SleepTime)
336                         fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666))
337                         if err != nil {
338                                 log.Println("OpenFile:", pth, err)
339                                 continue
340                         }
341                         statuses := make(map[string][]string)
342                         UserStatusM.RLock()
343                         for name, status := range UserStatus {
344                                 statuses[status] = append(statuses[status], name)
345                         }
346                         UserStatusM.RUnlock()
347                         for status := range statuses {
348                                 sort.Strings(statuses[status])
349                                 fmt.Fprintln(fd, status+":", strings.Join(statuses[status], " "))
350                         }
351                         fd.Close()
352                 }
353         }()
354
355         log.Println("syncing", len(updateQueue)/2, "rooms")
356         for len(updateQueue) > 0 {
357                 err := updatePosts(c, Users, updateQueue[0], updateQueue[1])
358                 if err != nil {
359                         log.Println("updatePosts:", err)
360                 }
361                 updateQueue = updateQueue[2:]
362         }
363         log.Println("sync done")
364         if *notifyCmd != "" {
365                 exec.Command(*notifyCmd, "sync done").Run()
366         }
367
368         go func() {
369                 os.MkdirAll("file", 0777)
370                 pthGet := path.Join("file", "get")
371                 os.Remove(pthGet)
372                 if err := syscall.Mkfifo(pthGet, 0666); err != nil {
373                         log.Fatalln(err)
374                 }
375                 pthOut := path.Join("file", "out")
376                 os.Remove(pthOut)
377                 if err := syscall.Mkfifo(pthOut, 0666); err != nil {
378                         log.Fatalln(err)
379                 }
380                 for {
381                         time.Sleep(mmc.SleepTime)
382                         fd, err := os.OpenFile(pthGet, os.O_RDONLY, os.FileMode(0666))
383                         if err != nil {
384                                 continue
385                         }
386                         data, err := io.ReadAll(fd)
387                         fd.Close()
388                         if err != nil {
389                                 continue
390                         }
391                         fileId := strings.TrimRight(string(data), " \n")
392                         if len(fileId) == 0 {
393                                 continue
394                         }
395                         fi, resp, err := c.GetFileInfo(fileId)
396                         if DebugFd != nil {
397                                 spew.Fdump(DebugFd, fi, resp)
398                         }
399                         if err != nil {
400                                 log.Println(err)
401                                 continue
402                         }
403                         if fi == nil {
404                                 fmt.Println(resp)
405                                 continue
406                         }
407                         data, resp, err = c.GetFile(fileId)
408                         if err != nil {
409                                 if DebugFd != nil {
410                                         spew.Fdump(DebugFd, resp)
411                                 }
412                                 log.Println(err)
413                                 continue
414                         }
415                         fd, err = os.OpenFile(
416                                 pthOut, os.O_WRONLY|os.O_APPEND, os.FileMode(0666),
417                         )
418                         if err != nil {
419                                 log.Println(err)
420                                 continue
421                         }
422                         w := tar.NewWriter(fd)
423                         if err = w.WriteHeader(&tar.Header{
424                                 Format:   tar.FormatPAX,
425                                 Typeflag: tar.TypeReg,
426                                 Name:     fi.Name,
427                                 Size:     int64(len(data)),
428                                 Mode:     0o666,
429                                 ModTime:  time.Unix(fi.CreateAt/1000, 0),
430                                 PAXRecords: map[string]string{
431                                         "MM.FileId":   fi.Id,
432                                         "MM.MIMEType": fi.MimeType,
433                                 },
434                         }); err != nil {
435                                 log.Println(err)
436                                 fd.Close()
437                                 continue
438                         }
439                         if _, err = io.Copy(w, bytes.NewReader(data)); err != nil {
440                                 log.Println(err)
441                                 fd.Close()
442                                 continue
443                         }
444                         err = w.Close()
445                         fd.Close()
446                         if err != nil {
447                                 log.Println(err)
448                         }
449                 }
450         }()
451
452         needsShutdown := make(chan os.Signal)
453         wc, err := model.NewWebSocketClient4("wss://"+*entrypoint, c.AuthToken)
454         if err != nil {
455                 log.Fatalln(err)
456         }
457         go func() {
458                 wc.Listen()
459                 wc.GetStatuses()
460                 t := time.NewTicker(time.Minute)
461                 for {
462                         select {
463                         case <-t.C:
464                                 if wc.ListenError != nil {
465                                         log.Println("ListenError:", wc.ListenError)
466                                         needsShutdown <- syscall.SIGTERM
467                                         return
468                                 }
469                                 wc.GetStatuses()
470                                 if *heartbeatCh != "" {
471                                         if _, _, err = c.ViewChannel(
472                                                 me.Id,
473                                                 &model.ChannelView{ChannelId: Chans[*heartbeatCh].Id},
474                                         ); err != nil {
475                                                 log.Println("ChannelView:", err)
476                                         }
477                                 }
478                         case <-wc.PingTimeoutChannel:
479                                 log.Println("PING timeout")
480                                 needsShutdown <- syscall.SIGTERM
481                                 return
482                         case e := <-wc.EventChannel:
483                                 if e == nil || !e.IsValid() {
484                                         continue
485                                 }
486                                 if DebugFd != nil {
487                                         spew.Fdump(DebugFd, e)
488                                 }
489                                 data := e.GetData()
490                                 var user *model.User
491                                 if userId, ok := data["user_id"]; ok && userId.(string) != "" {
492                                         user = Users[userId.(string)]
493                                         if user == nil {
494                                                 log.Println("unknown user for event:", e)
495                                                 continue
496                                         }
497                                 }
498                                 switch eventType := e.EventType(); eventType {
499                                 case model.WebsocketEventTyping:
500                                         if *notifyCmd != "" {
501                                                 exec.Command(*notifyCmd, "typing: "+user.Username).Run()
502                                         }
503                                 case model.WebsocketEventPostEdited,
504                                         model.WebsocketEventPostDeleted,
505                                         model.WebsocketEventPosted:
506                                         chName, ok := data["channel_name"].(string)
507                                         if !ok {
508                                                 continue
509                                         }
510                                         var post model.Post
511                                         if err = json.NewDecoder(
512                                                 strings.NewReader(data["post"].(string)),
513                                         ).Decode(&post); err != nil {
514                                                 log.Fatalln(err)
515                                         }
516                                         var recipient string
517                                         switch model.ChannelType(data["channel_type"].(string)) {
518                                         case model.ChannelTypeDirect:
519                                                 userId := strings.TrimPrefix(chName, me.Id+"__")
520                                                 userId = strings.TrimSuffix(userId, "__"+me.Id)
521                                                 user := Users[userId]
522                                                 if user == nil {
523                                                         log.Println("unknown user:", post)
524                                                         continue
525                                                 }
526                                                 recipient = path.Join("users", user.Username)
527                                         case model.ChannelTypeOpen:
528                                                 fallthrough
529                                         case model.ChannelTypePrivate:
530                                                 fallthrough
531                                         case model.ChannelTypeGroup:
532                                                 recipient = path.Join("chans", chName)
533                                         }
534                                         if err = writePosts(
535                                                 recipient, Users, []mmc.Post{{P: &post, E: eventType}},
536                                         ); err != nil {
537                                                 log.Fatalln(err)
538                                         }
539                                 case model.WebsocketEventStatusChange:
540                                         status := data["status"].(string)
541                                         UserStatusM.Lock()
542                                         UserStatus[user.Username] = status
543                                         UserStatusM.Unlock()
544                                         if *notifyCmd != "" {
545                                                 exec.Command(*notifyCmd, fmt.Sprintf(
546                                                         "status: %s -> %s", user.Username, status,
547                                                 )).Run()
548                                         }
549                                 case model.WebsocketEventHello:
550                                 case model.WebsocketEventChannelViewed:
551                                 default:
552                                         log.Println(e)
553                                 }
554                         case resp := <-wc.ResponseChannel:
555                                 if resp == nil || !resp.IsValid() {
556                                         continue
557                                 }
558                                 if DebugFd != nil {
559                                         spew.Fdump(DebugFd, resp)
560                                 }
561                                 statuses := make(map[string]string)
562                                 for userId, status := range resp.Data {
563                                         status, ok := status.(string)
564                                         if !ok {
565                                                 continue
566                                         }
567                                         user := Users[userId]
568                                         if user == nil {
569                                                 continue
570                                         }
571                                         statuses[user.Username] = status
572                                 }
573                                 if len(statuses) > 0 {
574                                         UserStatusM.Lock()
575                                         for u := range UserStatus {
576                                                 delete(UserStatus, u)
577                                         }
578                                         for u, status := range statuses {
579                                                 UserStatus[u] = status
580                                         }
581                                         UserStatusM.Unlock()
582                                 }
583                         }
584                 }
585         }()
586
587         signal.Notify(needsShutdown, syscall.SIGTERM, syscall.SIGINT)
588         <-needsShutdown
589         c.Logout()
590         log.Println("finished")
591 }