X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peer-conn-msg-writer.go;h=1bacc59d188c59ceb726abf4911939092ac9f574;hb=HEAD;hp=3948eb64263f9bebcdbd6e36578e78d7e8ac9d91;hpb=9b6e191b0a92aa5c67a1e90fbed5001e6ed273b4;p=btrtrc.git diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index 3948eb64..1bacc59d 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -12,7 +12,7 @@ import ( pp "github.com/anacrolix/torrent/peer_protocol" ) -func (pc *PeerConn) startWriter() { +func (pc *PeerConn) initMessageWriter() { w := &pc.messageWriter *w = peerConnMsgWriter{ fillWriteBuffer: func() { @@ -27,18 +27,24 @@ func (pc *PeerConn) startWriter() { logger: pc.logger, w: pc.w, keepAlive: func() bool { - pc.locker().Lock() - defer pc.locker().Unlock() + pc.locker().RLock() + defer pc.locker().RUnlock() return pc.useful() }, writeBuffer: new(bytes.Buffer), } - go func() { - defer pc.locker().Unlock() - defer pc.close() - defer pc.locker().Lock() - pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout) - }() +} + +func (pc *PeerConn) startMessageWriter() { + pc.initMessageWriter() + go pc.messageWriterRunner() +} + +func (pc *PeerConn) messageWriterRunner() { + defer pc.locker().Unlock() + defer pc.close() + defer pc.locker().Lock() + pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout) } type peerConnMsgWriter struct { @@ -86,19 +92,28 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) { // Flip the buffers. frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf cn.mu.Unlock() - n, err := cn.w.Write(frontBuf.Bytes()) - if n != 0 { - lastWrite = time.Now() - keepAliveTimer.Reset(keepAliveTimeout) + if frontBuf.Len() == 0 { + panic("expected non-empty front buffer") + } + var err error + for frontBuf.Len() != 0 { + // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59. + next := frontBuf.Next(1<<16 - 1) + var n int + n, err = cn.w.Write(next) + if err == nil && n != len(next) { + panic("expected full write") + } + if err != nil { + break + } } if err != nil { cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err) return } - if n != frontBuf.Len() { - panic("short write") - } - frontBuf.Reset() + lastWrite = time.Now() + keepAliveTimer.Reset(keepAliveTimeout) } }