8 "github.com/anacrolix/chansync"
9 "github.com/anacrolix/log"
10 "github.com/anacrolix/sync"
12 pp "github.com/anacrolix/torrent/peer_protocol"
15 func (pc *PeerConn) initMessageWriter() {
16 w := &pc.messageWriter
17 *w = peerConnMsgWriter{
18 fillWriteBuffer: func() {
20 defer pc.locker().Unlock()
21 if pc.closed.IsSet() {
29 keepAlive: func() bool {
31 defer pc.locker().RUnlock()
34 writeBuffer: new(bytes.Buffer),
38 func (pc *PeerConn) startMessageWriter() {
39 pc.initMessageWriter()
40 go pc.messageWriterRunner()
43 func (pc *PeerConn) messageWriterRunner() {
44 defer pc.locker().Unlock()
46 defer pc.locker().Lock()
47 pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
50 type peerConnMsgWriter struct {
51 // Must not be called with the local mutex held, as it will call back into the write method.
52 fillWriteBuffer func()
53 closed *chansync.SetOnce
59 writeCond chansync.BroadcastCond
60 // Pointer so we can swap with the "front buffer".
61 writeBuffer *bytes.Buffer
64 // Routine that writes to the peer. Some of what to write is buffered by
65 // activity elsewhere in the Client, and some is determined locally when the
66 // connection is writable.
67 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
68 lastWrite := time.Now()
69 keepAliveTimer := time.NewTimer(keepAliveTimeout)
70 frontBuf := new(bytes.Buffer)
72 if cn.closed.IsSet() {
76 keepAlive := cn.keepAlive()
78 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
79 cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
80 torrent.Add("written keepalives", 1)
82 if cn.writeBuffer.Len() == 0 {
83 writeCond := cn.writeCond.Signaled()
86 case <-cn.closed.Done():
88 case <-keepAliveTimer.C:
93 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
95 if frontBuf.Len() == 0 {
96 panic("expected non-empty front buffer")
99 for frontBuf.Len() != 0 {
100 // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59.
101 next := frontBuf.Next(1<<16 - 1)
103 n, err = cn.w.Write(next)
104 if err == nil && n != len(next) {
105 panic("expected full write")
109 cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
112 lastWrite = time.Now()
113 keepAliveTimer.Reset(keepAliveTimeout)
117 func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
120 cn.writeBuffer.Write(msg.MustMarshalBinary())
121 cn.writeCond.Broadcast()
122 return !cn.writeBufferFull()
125 func (cn *peerConnMsgWriter) writeBufferFull() bool {
126 return cn.writeBuffer.Len() >= writeBufferHighWaterLen