]> Sergey Matveev's repositories - btrtrc.git/blob - peer-conn-msg-writer.go
Drop support for go 1.20
[btrtrc.git] / peer-conn-msg-writer.go
1 package torrent
2
3 import (
4         "bytes"
5         "io"
6         "time"
7
8         "github.com/anacrolix/chansync"
9         "github.com/anacrolix/log"
10         "github.com/anacrolix/sync"
11
12         pp "github.com/anacrolix/torrent/peer_protocol"
13 )
14
15 func (pc *PeerConn) initMessageWriter() {
16         w := &pc.messageWriter
17         *w = peerConnMsgWriter{
18                 fillWriteBuffer: func() {
19                         pc.locker().Lock()
20                         defer pc.locker().Unlock()
21                         if pc.closed.IsSet() {
22                                 return
23                         }
24                         pc.fillWriteBuffer()
25                 },
26                 closed: &pc.closed,
27                 logger: pc.logger,
28                 w:      pc.w,
29                 keepAlive: func() bool {
30                         pc.locker().RLock()
31                         defer pc.locker().RUnlock()
32                         return pc.useful()
33                 },
34                 writeBuffer: new(bytes.Buffer),
35         }
36 }
37
38 func (pc *PeerConn) startMessageWriter() {
39         pc.initMessageWriter()
40         go pc.messageWriterRunner()
41 }
42
43 func (pc *PeerConn) messageWriterRunner() {
44         defer pc.locker().Unlock()
45         defer pc.close()
46         defer pc.locker().Lock()
47         pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
48 }
49
50 type peerConnMsgWriter struct {
51         // Must not be called with the local mutex held, as it will call back into the write method.
52         fillWriteBuffer func()
53         closed          *chansync.SetOnce
54         logger          log.Logger
55         w               io.Writer
56         keepAlive       func() bool
57
58         mu        sync.Mutex
59         writeCond chansync.BroadcastCond
60         // Pointer so we can swap with the "front buffer".
61         writeBuffer *bytes.Buffer
62 }
63
64 // Routine that writes to the peer. Some of what to write is buffered by
65 // activity elsewhere in the Client, and some is determined locally when the
66 // connection is writable.
67 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
68         lastWrite := time.Now()
69         keepAliveTimer := time.NewTimer(keepAliveTimeout)
70         frontBuf := new(bytes.Buffer)
71         for {
72                 if cn.closed.IsSet() {
73                         return
74                 }
75                 cn.fillWriteBuffer()
76                 keepAlive := cn.keepAlive()
77                 cn.mu.Lock()
78                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
79                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
80                         torrent.Add("written keepalives", 1)
81                 }
82                 if cn.writeBuffer.Len() == 0 {
83                         writeCond := cn.writeCond.Signaled()
84                         cn.mu.Unlock()
85                         select {
86                         case <-cn.closed.Done():
87                         case <-writeCond:
88                         case <-keepAliveTimer.C:
89                         }
90                         continue
91                 }
92                 // Flip the buffers.
93                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
94                 cn.mu.Unlock()
95                 if frontBuf.Len() == 0 {
96                         panic("expected non-empty front buffer")
97                 }
98                 var err error
99                 for frontBuf.Len() != 0 {
100                         // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59.
101                         next := frontBuf.Next(1<<16 - 1)
102                         var n int
103                         n, err = cn.w.Write(next)
104                         if err == nil && n != len(next) {
105                                 panic("expected full write")
106                         }
107                         if err != nil {
108                                 break
109                         }
110                 }
111                 if err != nil {
112                         cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
113                         return
114                 }
115                 lastWrite = time.Now()
116                 keepAliveTimer.Reset(keepAliveTimeout)
117         }
118 }
119
120 func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
121         cn.mu.Lock()
122         defer cn.mu.Unlock()
123         cn.writeBuffer.Write(msg.MustMarshalBinary())
124         cn.writeCond.Broadcast()
125         return !cn.writeBufferFull()
126 }
127
128 func (cn *peerConnMsgWriter) writeBufferFull() bool {
129         return cn.writeBuffer.Len() >= writeBufferHighWaterLen
130 }