]> Sergey Matveev's repositories - btrtrc.git/blob - peer-conn-msg-writer.go
Avoid reallocating keep alive timer on each pass
[btrtrc.git] / peer-conn-msg-writer.go
1 package torrent
2
3 import (
4         "bytes"
5         "io"
6         "time"
7
8         "github.com/anacrolix/chansync"
9         "github.com/anacrolix/log"
10         "github.com/anacrolix/sync"
11
12         pp "github.com/anacrolix/torrent/peer_protocol"
13 )
14
15 func (pc *PeerConn) startWriter() {
16         w := &pc.messageWriter
17         *w = peerConnMsgWriter{
18                 fillWriteBuffer: func() {
19                         pc.locker().Lock()
20                         defer pc.locker().Unlock()
21                         if pc.closed.IsSet() {
22                                 return
23                         }
24                         pc.fillWriteBuffer()
25                 },
26                 closed: &pc.closed,
27                 logger: pc.logger,
28                 w:      pc.w,
29                 keepAlive: func() bool {
30                         pc.locker().Lock()
31                         defer pc.locker().Unlock()
32                         return pc.useful()
33                 },
34                 writeBuffer: new(bytes.Buffer),
35         }
36         go func() {
37                 defer pc.locker().Unlock()
38                 defer pc.close()
39                 defer pc.locker().Lock()
40                 pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
41         }()
42 }
43
44 type peerConnMsgWriter struct {
45         // Must not be called with the local mutex held, as it will call back into the write method.
46         fillWriteBuffer func()
47         closed          *chansync.SetOnce
48         logger          log.Logger
49         w               io.Writer
50         keepAlive       func() bool
51
52         mu        sync.Mutex
53         writeCond chansync.BroadcastCond
54         // Pointer so we can swap with the "front buffer".
55         writeBuffer *bytes.Buffer
56 }
57
58 // Routine that writes to the peer. Some of what to write is buffered by
59 // activity elsewhere in the Client, and some is determined locally when the
60 // connection is writable.
61 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
62         lastWrite := time.Now()
63         keepAliveTimer := time.NewTimer(keepAliveTimeout)
64         frontBuf := new(bytes.Buffer)
65         for {
66                 if cn.closed.IsSet() {
67                         return
68                 }
69                 cn.fillWriteBuffer()
70                 keepAlive := cn.keepAlive()
71                 cn.mu.Lock()
72                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
73                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
74                         torrent.Add("written keepalives", 1)
75                 }
76                 if cn.writeBuffer.Len() == 0 {
77                         writeCond := cn.writeCond.Signaled()
78                         cn.mu.Unlock()
79                         select {
80                         case <-cn.closed.Done():
81                         case <-writeCond:
82                         case <-keepAliveTimer.C:
83                         }
84                         continue
85                 }
86                 // Flip the buffers.
87                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
88                 cn.mu.Unlock()
89                 n, err := cn.w.Write(frontBuf.Bytes())
90                 if n != 0 {
91                         lastWrite = time.Now()
92                         keepAliveTimer.Reset(keepAliveTimeout)
93                 }
94                 if err != nil {
95                         cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
96                         return
97                 }
98                 if n != frontBuf.Len() {
99                         panic("short write")
100                 }
101                 frontBuf.Reset()
102         }
103 }
104
105 func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
106         cn.mu.Lock()
107         defer cn.mu.Unlock()
108         cn.writeBuffer.Write(msg.MustMarshalBinary())
109         cn.writeCond.Broadcast()
110         return !cn.writeBufferFull()
111 }
112
113 func (cn *peerConnMsgWriter) writeBufferFull() bool {
114         return cn.writeBuffer.Len() >= writeBufferHighWaterLen
115 }