]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peer-conn-msg-writer.go
fix: torrent file real time completed bytes
[btrtrc.git] / peer-conn-msg-writer.go
index 4dbf00e407d70df7c5b6944bb6010481ed288d47..1bacc59d188c59ceb726abf4911939092ac9f574 100644 (file)
@@ -12,7 +12,7 @@ import (
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
-func (pc *PeerConn) startWriter() {
+func (pc *PeerConn) initMessageWriter() {
        w := &pc.messageWriter
        *w = peerConnMsgWriter{
                fillWriteBuffer: func() {
@@ -27,18 +27,24 @@ func (pc *PeerConn) startWriter() {
                logger: pc.logger,
                w:      pc.w,
                keepAlive: func() bool {
-                       pc.locker().Lock()
-                       defer pc.locker().Unlock()
+                       pc.locker().RLock()
+                       defer pc.locker().RUnlock()
                        return pc.useful()
                },
                writeBuffer: new(bytes.Buffer),
        }
-       go func() {
-               defer pc.locker().Unlock()
-               defer pc.close()
-               defer pc.locker().Lock()
-               pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
-       }()
+}
+
+func (pc *PeerConn) startMessageWriter() {
+       pc.initMessageWriter()
+       go pc.messageWriterRunner()
+}
+
+func (pc *PeerConn) messageWriterRunner() {
+       defer pc.locker().Unlock()
+       defer pc.close()
+       defer pc.locker().Lock()
+       pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
 }
 
 type peerConnMsgWriter struct {
@@ -60,6 +66,7 @@ type peerConnMsgWriter struct {
 // connection is writable.
 func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
        lastWrite := time.Now()
+       keepAliveTimer := time.NewTimer(keepAliveTimeout)
        frontBuf := new(bytes.Buffer)
        for {
                if cn.closed.IsSet() {
@@ -78,25 +85,35 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
                        select {
                        case <-cn.closed.Done():
                        case <-writeCond:
-                       case <-time.After(time.Until(lastWrite.Add(keepAliveTimeout))):
+                       case <-keepAliveTimer.C:
                        }
                        continue
                }
                // Flip the buffers.
                frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
                cn.mu.Unlock()
-               n, err := cn.w.Write(frontBuf.Bytes())
-               if n != 0 {
-                       lastWrite = time.Now()
+               if frontBuf.Len() == 0 {
+                       panic("expected non-empty front buffer")
+               }
+               var err error
+               for frontBuf.Len() != 0 {
+                       // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59.
+                       next := frontBuf.Next(1<<16 - 1)
+                       var n int
+                       n, err = cn.w.Write(next)
+                       if err == nil && n != len(next) {
+                               panic("expected full write")
+                       }
+                       if err != nil {
+                               break
+                       }
                }
                if err != nil {
                        cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
                        return
                }
-               if n != frontBuf.Len() {
-                       panic("short write")
-               }
-               frontBuf.Reset()
+               lastWrite = time.Now()
+               keepAliveTimer.Reset(keepAliveTimeout)
        }
 }