X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peer-conn-msg-writer.go;h=1bacc59d188c59ceb726abf4911939092ac9f574;hb=HEAD;hp=809d85fcada6ee2f42ecc7ac13ad8e7e6fe6d1af;hpb=906db030f3c9aa8cdcaf14975f5d875d5d62579c;p=btrtrc.git diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index 809d85fc..1bacc59d 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -5,14 +5,14 @@ import ( "io" "time" + "github.com/anacrolix/chansync" "github.com/anacrolix/log" "github.com/anacrolix/sync" - "github.com/anacrolix/torrent/internal/chansync" pp "github.com/anacrolix/torrent/peer_protocol" ) -func (pc *PeerConn) startWriter() { +func (pc *PeerConn) initMessageWriter() { w := &pc.messageWriter *w = peerConnMsgWriter{ fillWriteBuffer: func() { @@ -27,18 +27,24 @@ func (pc *PeerConn) startWriter() { logger: pc.logger, w: pc.w, keepAlive: func() bool { - pc.locker().Lock() - defer pc.locker().Unlock() + pc.locker().RLock() + defer pc.locker().RUnlock() return pc.useful() }, writeBuffer: new(bytes.Buffer), } - go func() { - defer pc.locker().Unlock() - defer pc.close() - defer pc.locker().Lock() - pc.messageWriter.run(time.Minute) - }() +} + +func (pc *PeerConn) startMessageWriter() { + pc.initMessageWriter() + go pc.messageWriterRunner() +} + +func (pc *PeerConn) messageWriterRunner() { + defer pc.locker().Unlock() + defer pc.close() + defer pc.locker().Lock() + pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout) } type peerConnMsgWriter struct { @@ -59,34 +65,17 @@ type peerConnMsgWriter struct { // activity elsewhere in the Client, and some is determined locally when the // connection is writable. func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) { - var ( - lastWrite time.Time = time.Now() - keepAliveTimer *time.Timer - ) - keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() { - cn.mu.Lock() - defer cn.mu.Unlock() - if time.Since(lastWrite) >= keepAliveTimeout { - cn.writeCond.Broadcast() - } - keepAliveTimer.Reset(keepAliveTimeout) - }) - cn.mu.Lock() - defer cn.mu.Unlock() - defer keepAliveTimer.Stop() + lastWrite := time.Now() + keepAliveTimer := time.NewTimer(keepAliveTimeout) frontBuf := new(bytes.Buffer) for { if cn.closed.IsSet() { return } - if cn.writeBuffer.Len() == 0 { - func() { - cn.mu.Unlock() - defer cn.mu.Lock() - cn.fillWriteBuffer() - }() - } - if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() { + cn.fillWriteBuffer() + keepAlive := cn.keepAlive() + cn.mu.Lock() + if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive { cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary()) torrent.Add("written keepalives", 1) } @@ -96,27 +85,35 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) { select { case <-cn.closed.Done(): case <-writeCond: + case <-keepAliveTimer.C: } - cn.mu.Lock() continue } // Flip the buffers. frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf cn.mu.Unlock() - n, err := cn.w.Write(frontBuf.Bytes()) - cn.mu.Lock() - if n != 0 { - lastWrite = time.Now() - keepAliveTimer.Reset(keepAliveTimeout) + if frontBuf.Len() == 0 { + panic("expected non-empty front buffer") + } + var err error + for frontBuf.Len() != 0 { + // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59. + next := frontBuf.Next(1<<16 - 1) + var n int + n, err = cn.w.Write(next) + if err == nil && n != len(next) { + panic("expected full write") + } + if err != nil { + break + } } if err != nil { cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err) return } - if n != frontBuf.Len() { - panic("short write") - } - frontBuf.Reset() + lastWrite = time.Now() + keepAliveTimer.Reset(keepAliveTimeout) } }