]> Sergey Matveev's repositories - btrtrc.git/blob - peer-conn-msg-writer.go
Extract chansync to github.com/anacrolix/chansync
[btrtrc.git] / peer-conn-msg-writer.go
1 package torrent
2
3 import (
4         "bytes"
5         "io"
6         "time"
7
8         "github.com/anacrolix/chansync"
9         "github.com/anacrolix/log"
10         "github.com/anacrolix/sync"
11
12         pp "github.com/anacrolix/torrent/peer_protocol"
13 )
14
15 func (pc *PeerConn) startWriter() {
16         w := &pc.messageWriter
17         *w = peerConnMsgWriter{
18                 fillWriteBuffer: func() {
19                         pc.locker().Lock()
20                         defer pc.locker().Unlock()
21                         if pc.closed.IsSet() {
22                                 return
23                         }
24                         pc.fillWriteBuffer()
25                 },
26                 closed: &pc.closed,
27                 logger: pc.logger,
28                 w:      pc.w,
29                 keepAlive: func() bool {
30                         pc.locker().Lock()
31                         defer pc.locker().Unlock()
32                         return pc.useful()
33                 },
34                 writeBuffer: new(bytes.Buffer),
35         }
36         go func() {
37                 defer pc.locker().Unlock()
38                 defer pc.close()
39                 defer pc.locker().Lock()
40                 pc.messageWriter.run(time.Minute)
41         }()
42 }
43
44 type peerConnMsgWriter struct {
45         // Must not be called with the local mutex held, as it will call back into the write method.
46         fillWriteBuffer func()
47         closed          *chansync.SetOnce
48         logger          log.Logger
49         w               io.Writer
50         keepAlive       func() bool
51
52         mu        sync.Mutex
53         writeCond chansync.BroadcastCond
54         // Pointer so we can swap with the "front buffer".
55         writeBuffer *bytes.Buffer
56 }
57
58 // Routine that writes to the peer. Some of what to write is buffered by
59 // activity elsewhere in the Client, and some is determined locally when the
60 // connection is writable.
61 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
62         var (
63                 lastWrite      time.Time = time.Now()
64                 keepAliveTimer *time.Timer
65         )
66         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
67                 cn.mu.Lock()
68                 defer cn.mu.Unlock()
69                 if time.Since(lastWrite) >= keepAliveTimeout {
70                         cn.writeCond.Broadcast()
71                 }
72                 keepAliveTimer.Reset(keepAliveTimeout)
73         })
74         cn.mu.Lock()
75         defer cn.mu.Unlock()
76         defer keepAliveTimer.Stop()
77         frontBuf := new(bytes.Buffer)
78         for {
79                 if cn.closed.IsSet() {
80                         return
81                 }
82                 if cn.writeBuffer.Len() == 0 {
83                         func() {
84                                 cn.mu.Unlock()
85                                 defer cn.mu.Lock()
86                                 cn.fillWriteBuffer()
87                         }()
88                 }
89                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
90                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
91                         torrent.Add("written keepalives", 1)
92                 }
93                 if cn.writeBuffer.Len() == 0 {
94                         writeCond := cn.writeCond.Signaled()
95                         cn.mu.Unlock()
96                         select {
97                         case <-cn.closed.Done():
98                         case <-writeCond:
99                         }
100                         cn.mu.Lock()
101                         continue
102                 }
103                 // Flip the buffers.
104                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
105                 cn.mu.Unlock()
106                 n, err := cn.w.Write(frontBuf.Bytes())
107                 cn.mu.Lock()
108                 if n != 0 {
109                         lastWrite = time.Now()
110                         keepAliveTimer.Reset(keepAliveTimeout)
111                 }
112                 if err != nil {
113                         cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
114                         return
115                 }
116                 if n != frontBuf.Len() {
117                         panic("short write")
118                 }
119                 frontBuf.Reset()
120         }
121 }
122
123 func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
124         cn.mu.Lock()
125         defer cn.mu.Unlock()
126         cn.writeBuffer.Write(msg.MustMarshalBinary())
127         cn.writeCond.Broadcast()
128         return !cn.writeBufferFull()
129 }
130
131 func (cn *peerConnMsgWriter) writeBufferFull() bool {
132         return cn.writeBuffer.Len() >= writeBufferHighWaterLen
133 }