From: Matt Joiner Date: Thu, 20 May 2021 08:55:23 +0000 (+1000) Subject: Move peerConnMsgWriter into its own file X-Git-Tag: v1.29.0~31^2~15 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=d37dea1f61a9c83c05f32a302e1e40da6d853505;p=btrtrc.git Move peerConnMsgWriter into its own file --- diff --git a/client.go b/client.go index 31dbf4ce..7ac6b264 100644 --- a/client.go +++ b/client.go @@ -2,7 +2,6 @@ package torrent import ( "bufio" - "bytes" "context" "crypto/rand" "encoding/binary" @@ -21,13 +20,11 @@ import ( "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" + "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/conntrack" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" - "github.com/anacrolix/torrent/internal/limiter" - request_strategy "github.com/anacrolix/torrent/request-strategy" - "github.com/anacrolix/torrent/tracker" - "github.com/anacrolix/torrent/webtorrent" "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" "github.com/google/btree" @@ -35,15 +32,16 @@ import ( "golang.org/x/time/rate" "golang.org/x/xerrors" - "github.com/anacrolix/missinggo/v2" - "github.com/anacrolix/missinggo/v2/conntrack" - "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/internal/limiter" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/storage" + "github.com/anacrolix/torrent/tracker" + "github.com/anacrolix/torrent/webtorrent" ) // Clients contain zero or more Torrents. A Client manages a blocklist, the @@ -969,32 +967,6 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { return nil } -func (pc *PeerConn) startWriter() { - w := &pc.messageWriter - *w = peerConnWriter{ - fillWriteBuffer: func() { - pc.locker().Lock() - defer pc.locker().Unlock() - pc.fillWriteBuffer() - }, - closed: &pc.closed, - logger: pc.logger, - w: pc.w, - keepAlive: func() bool { - pc.locker().Lock() - defer pc.locker().Unlock() - return pc.useful() - }, - writeBuffer: new(bytes.Buffer), - } - go func() { - defer pc.locker().Unlock() - defer pc.close() - defer pc.locker().Lock() - pc.messageWriter.run(time.Minute) - }() -} - // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB // (1<<19) cached for sending, for 16KiB (1<<14) chunks. diff --git a/peerconn.go b/peerconn.go index 9cddf7d7..b2753d6a 100644 --- a/peerconn.go +++ b/peerconn.go @@ -18,7 +18,6 @@ import ( "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/multiless" - "github.com/anacrolix/sync" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/chansync" @@ -148,7 +147,7 @@ type PeerConn struct { w io.Writer r io.Reader - messageWriter peerConnWriter + messageWriter peerConnMsgWriter uploadTimer *time.Timer pex pexConnState @@ -438,7 +437,7 @@ func (cn *PeerConn) write(msg pp.Message) bool { return notFull } -func (cn *peerConnWriter) write(msg pp.Message) bool { +func (cn *peerConnMsgWriter) write(msg pp.Message) bool { cn.mu.Lock() defer cn.mu.Unlock() cn.writeBuffer.Write(msg.MustMarshalBinary()) @@ -446,7 +445,7 @@ func (cn *peerConnWriter) write(msg pp.Message) bool { return !cn.writeBufferFull() } -func (cn *peerConnWriter) writeBufferFull() bool { +func (cn *peerConnMsgWriter) writeBufferFull() bool { return cn.writeBuffer.Len() >= writeBufferHighWaterLen } @@ -643,85 +642,6 @@ func (cn *PeerConn) fillWriteBuffer() { cn.upload(cn.write) } -type peerConnWriter struct { - // Must not be called with the local mutex held, as it will call back into the write method. - fillWriteBuffer func() - closed *chansync.SetOnce - logger log.Logger - w io.Writer - keepAlive func() bool - - mu sync.Mutex - writeCond chansync.BroadcastCond - // Pointer so we can swap with the "front buffer". - writeBuffer *bytes.Buffer -} - -// Routine that writes to the peer. Some of what to write is buffered by -// activity elsewhere in the Client, and some is determined locally when the -// connection is writable. -func (cn *peerConnWriter) run(keepAliveTimeout time.Duration) { - var ( - lastWrite time.Time = time.Now() - keepAliveTimer *time.Timer - ) - keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() { - cn.mu.Lock() - defer cn.mu.Unlock() - if time.Since(lastWrite) >= keepAliveTimeout { - cn.writeCond.Broadcast() - } - keepAliveTimer.Reset(keepAliveTimeout) - }) - cn.mu.Lock() - defer cn.mu.Unlock() - defer keepAliveTimer.Stop() - frontBuf := new(bytes.Buffer) - for { - if cn.closed.IsSet() { - return - } - if cn.writeBuffer.Len() == 0 { - func() { - cn.mu.Unlock() - defer cn.mu.Lock() - cn.fillWriteBuffer() - }() - } - if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() { - cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary()) - torrent.Add("written keepalives", 1) - } - if cn.writeBuffer.Len() == 0 { - writeCond := cn.writeCond.WaitChan() - cn.mu.Unlock() - select { - case <-cn.closed.Chan(): - case <-writeCond: - } - cn.mu.Lock() - continue - } - // Flip the buffers. - frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf - cn.mu.Unlock() - n, err := cn.w.Write(frontBuf.Bytes()) - cn.mu.Lock() - if n != 0 { - lastWrite = time.Now() - keepAliveTimer.Reset(keepAliveTimeout) - } - if err != nil { - cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err) - return - } - if n != frontBuf.Len() { - panic("short write") - } - frontBuf.Reset() - } -} - func (cn *PeerConn) have(piece pieceIndex) { if cn.sentHaves.Get(bitmap.BitIndex(piece)) { return