]> Sergey Matveev's repositories - btrtrc.git/blob - peer-conn-msg-writer.go
Rewrite peerConnMsgWriter.run
[btrtrc.git] / peer-conn-msg-writer.go
1 package torrent
2
3 import (
4         "bytes"
5         "io"
6         "time"
7
8         "github.com/anacrolix/chansync"
9         "github.com/anacrolix/log"
10         "github.com/anacrolix/sync"
11
12         pp "github.com/anacrolix/torrent/peer_protocol"
13 )
14
15 func (pc *PeerConn) startWriter() {
16         w := &pc.messageWriter
17         *w = peerConnMsgWriter{
18                 fillWriteBuffer: func() {
19                         pc.locker().Lock()
20                         defer pc.locker().Unlock()
21                         if pc.closed.IsSet() {
22                                 return
23                         }
24                         pc.fillWriteBuffer()
25                 },
26                 closed: &pc.closed,
27                 logger: pc.logger,
28                 w:      pc.w,
29                 keepAlive: func() bool {
30                         pc.locker().Lock()
31                         defer pc.locker().Unlock()
32                         return pc.useful()
33                 },
34                 writeBuffer: new(bytes.Buffer),
35         }
36         go func() {
37                 defer pc.locker().Unlock()
38                 defer pc.close()
39                 defer pc.locker().Lock()
40                 pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
41         }()
42 }
43
44 type peerConnMsgWriter struct {
45         // Must not be called with the local mutex held, as it will call back into the write method.
46         fillWriteBuffer func()
47         closed          *chansync.SetOnce
48         logger          log.Logger
49         w               io.Writer
50         keepAlive       func() bool
51
52         mu        sync.Mutex
53         writeCond chansync.BroadcastCond
54         // Pointer so we can swap with the "front buffer".
55         writeBuffer *bytes.Buffer
56 }
57
58 // Routine that writes to the peer. Some of what to write is buffered by
59 // activity elsewhere in the Client, and some is determined locally when the
60 // connection is writable.
61 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
62         lastWrite := time.Now()
63         frontBuf := new(bytes.Buffer)
64         for {
65                 if cn.closed.IsSet() {
66                         return
67                 }
68                 cn.fillWriteBuffer()
69                 keepAlive := cn.keepAlive()
70                 cn.mu.Lock()
71                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
72                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
73                         torrent.Add("written keepalives", 1)
74                 }
75                 if cn.writeBuffer.Len() == 0 {
76                         writeCond := cn.writeCond.Signaled()
77                         cn.mu.Unlock()
78                         select {
79                         case <-cn.closed.Done():
80                         case <-writeCond:
81                         case <-time.After(time.Until(lastWrite.Add(keepAliveTimeout))):
82                         }
83                         continue
84                 }
85                 // Flip the buffers.
86                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
87                 cn.mu.Unlock()
88                 n, err := cn.w.Write(frontBuf.Bytes())
89                 if n != 0 {
90                         lastWrite = time.Now()
91                 }
92                 if err != nil {
93                         cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
94                         return
95                 }
96                 if n != frontBuf.Len() {
97                         panic("short write")
98                 }
99                 frontBuf.Reset()
100         }
101 }
102
103 func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
104         cn.mu.Lock()
105         defer cn.mu.Unlock()
106         cn.writeBuffer.Write(msg.MustMarshalBinary())
107         cn.writeCond.Broadcast()
108         return !cn.writeBufferFull()
109 }
110
111 func (cn *peerConnMsgWriter) writeBufferFull() bool {
112         return cn.writeBuffer.Len() >= writeBufferHighWaterLen
113 }