8 "github.com/anacrolix/chansync"
9 "github.com/anacrolix/log"
10 "github.com/anacrolix/sync"
12 pp "github.com/anacrolix/torrent/peer_protocol"
15 func (pc *PeerConn) startWriter() {
16 w := &pc.messageWriter
17 *w = peerConnMsgWriter{
18 fillWriteBuffer: func() {
20 defer pc.locker().Unlock()
21 if pc.closed.IsSet() {
29 keepAlive: func() bool {
31 defer pc.locker().Unlock()
34 writeBuffer: new(bytes.Buffer),
37 defer pc.locker().Unlock()
39 defer pc.locker().Lock()
40 pc.messageWriter.run(time.Minute)
44 type peerConnMsgWriter struct {
45 // Must not be called with the local mutex held, as it will call back into the write method.
46 fillWriteBuffer func()
47 closed *chansync.SetOnce
53 writeCond chansync.BroadcastCond
54 // Pointer so we can swap with the "front buffer".
55 writeBuffer *bytes.Buffer
58 // Routine that writes to the peer. Some of what to write is buffered by
59 // activity elsewhere in the Client, and some is determined locally when the
60 // connection is writable.
61 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
63 lastWrite time.Time = time.Now()
64 keepAliveTimer *time.Timer
66 keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
69 if time.Since(lastWrite) >= keepAliveTimeout {
70 cn.writeCond.Broadcast()
72 keepAliveTimer.Reset(keepAliveTimeout)
76 defer keepAliveTimer.Stop()
77 frontBuf := new(bytes.Buffer)
79 if cn.closed.IsSet() {
82 if cn.writeBuffer.Len() == 0 {
89 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
90 cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
91 torrent.Add("written keepalives", 1)
93 if cn.writeBuffer.Len() == 0 {
94 writeCond := cn.writeCond.Signaled()
97 case <-cn.closed.Done():
104 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
106 n, err := cn.w.Write(frontBuf.Bytes())
109 lastWrite = time.Now()
110 keepAliveTimer.Reset(keepAliveTimeout)
113 cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
116 if n != frontBuf.Len() {
123 func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
126 cn.writeBuffer.Write(msg.MustMarshalBinary())
127 cn.writeCond.Broadcast()
128 return !cn.writeBufferFull()
131 func (cn *peerConnMsgWriter) writeBufferFull() bool {
132 return cn.writeBuffer.Len() >= writeBufferHighWaterLen