]> Sergey Matveev's repositories - btrtrc.git/blob - peer-conn-msg-writer.go
Fix deadlock when checking whether to send keep alive
[btrtrc.git] / peer-conn-msg-writer.go
1 package torrent
2
3 import (
4         "bytes"
5         "io"
6         "time"
7
8         "github.com/anacrolix/chansync"
9         "github.com/anacrolix/log"
10         "github.com/anacrolix/sync"
11
12         pp "github.com/anacrolix/torrent/peer_protocol"
13 )
14
15 func (pc *PeerConn) startWriter() {
16         w := &pc.messageWriter
17         *w = peerConnMsgWriter{
18                 fillWriteBuffer: func() {
19                         pc.locker().Lock()
20                         defer pc.locker().Unlock()
21                         if pc.closed.IsSet() {
22                                 return
23                         }
24                         pc.fillWriteBuffer()
25                 },
26                 closed: &pc.closed,
27                 logger: pc.logger,
28                 w:      pc.w,
29                 keepAlive: func() bool {
30                         pc.locker().Lock()
31                         defer pc.locker().Unlock()
32                         return pc.useful()
33                 },
34                 writeBuffer: new(bytes.Buffer),
35         }
36         go func() {
37                 defer pc.locker().Unlock()
38                 defer pc.close()
39                 defer pc.locker().Lock()
40                 pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
41         }()
42 }
43
44 type peerConnMsgWriter struct {
45         // Must not be called with the local mutex held, as it will call back into the write method.
46         fillWriteBuffer func()
47         closed          *chansync.SetOnce
48         logger          log.Logger
49         w               io.Writer
50         keepAlive       func() bool
51
52         mu        sync.Mutex
53         writeCond chansync.BroadcastCond
54         // Pointer so we can swap with the "front buffer".
55         writeBuffer *bytes.Buffer
56 }
57
58 // Routine that writes to the peer. Some of what to write is buffered by
59 // activity elsewhere in the Client, and some is determined locally when the
60 // connection is writable.
61 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
62         var (
63                 lastWrite      time.Time = time.Now()
64                 keepAliveTimer *time.Timer
65         )
66         cn.mu.Lock()
67         defer cn.mu.Unlock()
68         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
69                 cn.mu.Lock()
70                 defer cn.mu.Unlock()
71                 if time.Since(lastWrite) >= keepAliveTimeout {
72                         cn.writeCond.Broadcast()
73                 }
74                 keepAliveTimer.Reset(keepAliveTimeout)
75         })
76         defer keepAliveTimer.Stop()
77         frontBuf := new(bytes.Buffer)
78         for {
79                 if cn.closed.IsSet() {
80                         return
81                 }
82                 keepAlive := false
83                 if cn.writeBuffer.Len() == 0 {
84                         func() {
85                                 cn.mu.Unlock()
86                                 defer cn.mu.Lock()
87                                 cn.fillWriteBuffer()
88                                 keepAlive = cn.keepAlive()
89                         }()
90                 }
91                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
92                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
93                         torrent.Add("written keepalives", 1)
94                 }
95                 if cn.writeBuffer.Len() == 0 {
96                         writeCond := cn.writeCond.Signaled()
97                         cn.mu.Unlock()
98                         select {
99                         case <-cn.closed.Done():
100                         case <-writeCond:
101                         }
102                         cn.mu.Lock()
103                         continue
104                 }
105                 // Flip the buffers.
106                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
107                 cn.mu.Unlock()
108                 n, err := cn.w.Write(frontBuf.Bytes())
109                 cn.mu.Lock()
110                 if n != 0 {
111                         lastWrite = time.Now()
112                         keepAliveTimer.Reset(keepAliveTimeout)
113                 }
114                 if err != nil {
115                         cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
116                         return
117                 }
118                 if n != frontBuf.Len() {
119                         panic("short write")
120                 }
121                 frontBuf.Reset()
122         }
123 }
124
125 func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
126         cn.mu.Lock()
127         defer cn.mu.Unlock()
128         cn.writeBuffer.Write(msg.MustMarshalBinary())
129         cn.writeCond.Broadcast()
130         return !cn.writeBufferFull()
131 }
132
133 func (cn *peerConnMsgWriter) writeBufferFull() bool {
134         return cn.writeBuffer.Len() >= writeBufferHighWaterLen
135 }