From: Matt Joiner Date: Thu, 20 May 2021 08:51:08 +0000 (+1000) Subject: Break out peerConnWriter X-Git-Tag: v1.29.0~31^2~16 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=24ceed61dabd4f404963be9d4343d17422ebeab8;p=btrtrc.git Break out peerConnWriter --- diff --git a/client.go b/client.go index 98bf0892..31dbf4ce 100644 --- a/client.go +++ b/client.go @@ -960,7 +960,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { return fmt.Errorf("adding connection: %w", err) } defer t.dropConnection(c) - go c.writer(time.Minute) + c.startWriter() cl.sendInitialMessages(c, t) err := c.mainReadLoop() if err != nil { @@ -969,6 +969,32 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { return nil } +func (pc *PeerConn) startWriter() { + w := &pc.messageWriter + *w = peerConnWriter{ + fillWriteBuffer: func() { + pc.locker().Lock() + defer pc.locker().Unlock() + pc.fillWriteBuffer() + }, + closed: &pc.closed, + logger: pc.logger, + w: pc.w, + keepAlive: func() bool { + pc.locker().Lock() + defer pc.locker().Unlock() + return pc.useful() + }, + writeBuffer: new(bytes.Buffer), + } + go func() { + defer pc.locker().Unlock() + defer pc.close() + defer pc.locker().Lock() + pc.messageWriter.run(time.Minute) + }() +} + // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB // (1<<19) cached for sending, for 16KiB (1<<14) chunks. @@ -1409,13 +1435,11 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot Network: network, callbacks: &cl.config.Callbacks, }, - connString: connString, - conn: nc, - writeBuffer: new(bytes.Buffer), + connString: connString, + conn: nc, } c.peerImpl = c c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) - c.writerCond.L = cl.locker() c.setRW(connStatsReadWriter{nc, c}) c.r = &rateLimitedReader{ l: cl.config.DownloadRateLimiter, diff --git a/go.mod b/go.mod index 7a1e19b3..88326bfe 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/anacrolix/missinggo/perf v1.0.0 github.com/anacrolix/missinggo/v2 v2.5.1-0.20210520011502-b3d95d6b1d02 github.com/anacrolix/multiless v0.1.1-0.20210520040635-10ee7b5f3cff - github.com/anacrolix/sync v0.2.0 + github.com/anacrolix/sync v0.3.0 github.com/anacrolix/tagflag v1.3.0 github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425 github.com/anacrolix/utp v0.1.0 diff --git a/go.sum b/go.sum index 8c49d1e3..c12955bc 100644 --- a/go.sum +++ b/go.sum @@ -133,6 +133,10 @@ github.com/anacrolix/sync v0.0.0-20180611022320-3c4cb11f5a01/go.mod h1:+u91KiUuf github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778/go.mod h1:s735Etp3joe/voe2sdaXLcqDdJSay1O0OPnM0ystjqk= github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY= github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g= +github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f h1:7KqmZoEOIXa0UbR2WQ/YPF4H+MPV6rhWk4E4tcv5eDg= +github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g= +github.com/anacrolix/sync v0.3.0 h1:ZPjTrkqQWEfnYVGTQHh5qNjokWaXnjsyXTJSMsKY0TA= +github.com/anacrolix/sync v0.3.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g= github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v0.0.0-20180605133421-f477c8c2f14c/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v0.0.0-20180803105420-3a8ff5428f76/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= @@ -428,6 +432,7 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= diff --git a/internal/chansync/broadcast-cond.go.go b/internal/chansync/broadcast-cond.go.go new file mode 100644 index 00000000..9b890692 --- /dev/null +++ b/internal/chansync/broadcast-cond.go.go @@ -0,0 +1,32 @@ +package chansync + +import ( + "github.com/anacrolix/sync" +) + +// Can be used as zero-value. Due to the caller needing to bring their own synchronization, an +// eqiuvalent to "sync".Cond.Signal is not provided. BroadcastCond is intended to be selected on +// with other channels. +type BroadcastCond struct { + mu sync.Mutex + ch chan struct{} +} + +func (me *BroadcastCond) Broadcast() { + me.mu.Lock() + defer me.mu.Unlock() + if me.ch != nil { + close(me.ch) + me.ch = nil + } +} + +// Should be called before releasing locks on resources that might trigger subsequent Broadcasts. +func (me *BroadcastCond) WaitChan() <-chan struct{} { + me.mu.Lock() + defer me.mu.Unlock() + if me.ch == nil { + me.ch = make(chan struct{}) + } + return me.ch +} diff --git a/internal/chansync/set-once.go b/internal/chansync/set-once.go new file mode 100644 index 00000000..523e5eaf --- /dev/null +++ b/internal/chansync/set-once.go @@ -0,0 +1,41 @@ +package chansync + +import "sync" + +// SetOnce is a boolean value that can only be flipped from false to true. +type SetOnce struct { + ch chan struct{} + initOnce sync.Once + closeOnce sync.Once +} + +func (me *SetOnce) Chan() <-chan struct{} { + me.init() + return me.ch +} + +func (me *SetOnce) init() { + me.initOnce.Do(func() { + me.ch = make(chan struct{}) + }) +} + +// Set only returns true the first time it is called. +func (me *SetOnce) Set() (first bool) { + me.closeOnce.Do(func() { + me.init() + first = true + close(me.ch) + }) + return +} + +func (me *SetOnce) IsSet() bool { + me.init() + select { + case <-me.ch: + return true + default: + return false + } +} diff --git a/peerconn.go b/peerconn.go index 8e3f5443..9cddf7d7 100644 --- a/peerconn.go +++ b/peerconn.go @@ -11,18 +11,18 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/anacrolix/log" - "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/multiless" - "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/sync" "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/internal/chansync" + "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -68,7 +68,7 @@ type Peer struct { cryptoMethod mse.CryptoMethod Discovery PeerSource trusted bool - closed missinggo.Event + closed chansync.SetOnce // Set true after we've added our ConnStats generated during handshake to // other ConnStat instances as determined when the *Torrent became known. reconciledHandshakeStats bool @@ -148,11 +148,10 @@ type PeerConn struct { w io.Writer r io.Reader - writeBuffer *bytes.Buffer - uploadTimer *time.Timer - writerCond sync.Cond + messageWriter peerConnWriter - pex pexConnState + uploadTimer *time.Timer + pex pexConnState } func (cn *PeerConn) connStatusString() string { @@ -429,17 +428,25 @@ const writeBufferHighWaterLen = 1 << 15 // done asynchronously, so it may be that we're not able to honour backpressure from this method. func (cn *PeerConn) write(msg pp.Message) bool { torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1) - // We don't need to track bytes here because a connection.w Writer wrapper takes care of that - // (although there's some delay between us recording the message, and the connection writer + // We don't need to track bytes here because the connection's Writer has that behaviour injected + // (although there's some delay between us buffering the message, and the connection writer // flushing it out.). - cn.writeBuffer.Write(msg.MustMarshalBinary()) + notFull := cn.messageWriter.write(msg) // Last I checked only Piece messages affect stats, and we don't write those. cn.wroteMsg(&msg) cn.tickleWriter() + return notFull +} + +func (cn *peerConnWriter) write(msg pp.Message) bool { + cn.mu.Lock() + defer cn.mu.Unlock() + cn.writeBuffer.Write(msg.MustMarshalBinary()) + cn.writeCond.Broadcast() return !cn.writeBufferFull() } -func (cn *PeerConn) writeBufferFull() bool { +func (cn *peerConnWriter) writeBufferFull() bool { return cn.writeBuffer.Len() >= writeBufferHighWaterLen } @@ -636,25 +643,38 @@ func (cn *PeerConn) fillWriteBuffer() { cn.upload(cn.write) } +type peerConnWriter struct { + // Must not be called with the local mutex held, as it will call back into the write method. + fillWriteBuffer func() + closed *chansync.SetOnce + logger log.Logger + w io.Writer + keepAlive func() bool + + mu sync.Mutex + writeCond chansync.BroadcastCond + // Pointer so we can swap with the "front buffer". + writeBuffer *bytes.Buffer +} + // Routine that writes to the peer. Some of what to write is buffered by // activity elsewhere in the Client, and some is determined locally when the // connection is writable. -func (cn *PeerConn) writer(keepAliveTimeout time.Duration) { +func (cn *peerConnWriter) run(keepAliveTimeout time.Duration) { var ( lastWrite time.Time = time.Now() keepAliveTimer *time.Timer ) keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() { - cn.locker().Lock() - defer cn.locker().Unlock() + cn.mu.Lock() + defer cn.mu.Unlock() if time.Since(lastWrite) >= keepAliveTimeout { - cn.tickleWriter() + cn.writeCond.Broadcast() } keepAliveTimer.Reset(keepAliveTimeout) }) - cn.locker().Lock() - defer cn.locker().Unlock() - defer cn.close() + cn.mu.Lock() + defer cn.mu.Unlock() defer keepAliveTimer.Stop() frontBuf := new(bytes.Buffer) for { @@ -662,22 +682,31 @@ func (cn *PeerConn) writer(keepAliveTimeout time.Duration) { return } if cn.writeBuffer.Len() == 0 { - cn.fillWriteBuffer() + func() { + cn.mu.Unlock() + defer cn.mu.Lock() + cn.fillWriteBuffer() + }() } - if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.useful() { + if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() { cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary()) torrent.Add("written keepalives", 1) } if cn.writeBuffer.Len() == 0 { - // TODO: Minimize wakeups.... - cn.writerCond.Wait() + writeCond := cn.writeCond.WaitChan() + cn.mu.Unlock() + select { + case <-cn.closed.Chan(): + case <-writeCond: + } + cn.mu.Lock() continue } // Flip the buffers. frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf - cn.locker().Unlock() + cn.mu.Unlock() n, err := cn.w.Write(frontBuf.Bytes()) - cn.locker().Lock() + cn.mu.Lock() if n != 0 { lastWrite = time.Now() keepAliveTimer.Reset(keepAliveTimeout) @@ -1463,7 +1492,7 @@ func (c *PeerConn) uploadAllowed() bool { func (c *PeerConn) setRetryUploadTimer(delay time.Duration) { if c.uploadTimer == nil { - c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast) + c.uploadTimer = time.AfterFunc(delay, c.tickleWriter) } else { c.uploadTimer.Reset(delay) } @@ -1558,7 +1587,7 @@ func (c *Peer) deleteAllRequests() { // This is called when something has changed that should wake the writer, such as putting stuff into // the writeBuffer, or changing some state that the writer can act on. func (c *PeerConn) tickleWriter() { - c.writerCond.Broadcast() + c.messageWriter.writeCond.Broadcast() } func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { diff --git a/peerconn_test.go b/peerconn_test.go index 18fc98ef..c28bc632 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -5,7 +5,6 @@ import ( "net" "sync" "testing" - "time" "github.com/anacrolix/missinggo/pubsub" "github.com/bradfitz/iter" @@ -32,7 +31,7 @@ func TestSendBitfieldThenHave(t *testing.T) { r, w := io.Pipe() //c.r = r c.w = w - go c.writer(time.Minute) + c.startWriter() c.locker().Lock() c.t._completedPieces.Add(1) c.postBitfield( /*[]bool{false, true, false}*/ ) diff --git a/pexconn_test.go b/pexconn_test.go index a3ff1d02..df9aa44e 100644 --- a/pexconn_test.go +++ b/pexconn_test.go @@ -21,7 +21,7 @@ func TestPexConnState(t *testing.T) { c := cl.newConnection(nil, false, addr, addr.Network(), "") c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber) c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId - c.writerCond.L.Lock() + c.messageWriter.mu.Lock() c.setTorrent(torrent) torrent.addPeerConn(c) @@ -36,7 +36,7 @@ func TestPexConnState(t *testing.T) { out = m return true } - c.writerCond.Wait() + <-c.messageWriter.writeCond.WaitChan() c.pex.Share(testWriter) require.True(t, writerCalled) require.EqualValues(t, pp.Extended, out.Type)