]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Break out peerConnWriter
authorMatt Joiner <anacrolix@gmail.com>
Thu, 20 May 2021 08:51:08 +0000 (18:51 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:40 +0000 (13:01 +1000)
client.go
go.mod
go.sum
internal/chansync/broadcast-cond.go.go [new file with mode: 0644]
internal/chansync/set-once.go [new file with mode: 0644]
peerconn.go
peerconn_test.go
pexconn_test.go

index 98bf0892465a160d17fa06250ac9b78bff1650bc..31dbf4ce3d7803bde3dba96247004870d0564ddd 100644 (file)
--- a/client.go
+++ b/client.go
@@ -960,7 +960,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
                return fmt.Errorf("adding connection: %w", err)
        }
        defer t.dropConnection(c)
-       go c.writer(time.Minute)
+       c.startWriter()
        cl.sendInitialMessages(c, t)
        err := c.mainReadLoop()
        if err != nil {
@@ -969,6 +969,32 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
        return nil
 }
 
+func (pc *PeerConn) startWriter() {
+       w := &pc.messageWriter
+       *w = peerConnWriter{
+               fillWriteBuffer: func() {
+                       pc.locker().Lock()
+                       defer pc.locker().Unlock()
+                       pc.fillWriteBuffer()
+               },
+               closed: &pc.closed,
+               logger: pc.logger,
+               w:      pc.w,
+               keepAlive: func() bool {
+                       pc.locker().Lock()
+                       defer pc.locker().Unlock()
+                       return pc.useful()
+               },
+               writeBuffer: new(bytes.Buffer),
+       }
+       go func() {
+               defer pc.locker().Unlock()
+               defer pc.close()
+               defer pc.locker().Lock()
+               pc.messageWriter.run(time.Minute)
+       }()
+}
+
 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
@@ -1409,13 +1435,11 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
                        Network:    network,
                        callbacks:  &cl.config.Callbacks,
                },
-               connString:  connString,
-               conn:        nc,
-               writeBuffer: new(bytes.Buffer),
+               connString: connString,
+               conn:       nc,
        }
        c.peerImpl = c
        c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
-       c.writerCond.L = cl.locker()
        c.setRW(connStatsReadWriter{nc, c})
        c.r = &rateLimitedReader{
                l: cl.config.DownloadRateLimiter,
diff --git a/go.mod b/go.mod
index 7a1e19b34fc99ba94339ef119140e2040f0b4a03..88326bfea5a71bba9810f49181248b0b9e394675 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,7 @@ require (
        github.com/anacrolix/missinggo/perf v1.0.0
        github.com/anacrolix/missinggo/v2 v2.5.1-0.20210520011502-b3d95d6b1d02
        github.com/anacrolix/multiless v0.1.1-0.20210520040635-10ee7b5f3cff
-       github.com/anacrolix/sync v0.2.0
+       github.com/anacrolix/sync v0.3.0
        github.com/anacrolix/tagflag v1.3.0
        github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425
        github.com/anacrolix/utp v0.1.0
diff --git a/go.sum b/go.sum
index 8c49d1e30cfa2839722075a49835419f328c396a..c12955bc83396437adeededfd84e2828daad306d 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -133,6 +133,10 @@ github.com/anacrolix/sync v0.0.0-20180611022320-3c4cb11f5a01/go.mod h1:+u91KiUuf
 github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778/go.mod h1:s735Etp3joe/voe2sdaXLcqDdJSay1O0OPnM0ystjqk=
 github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY=
 github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
+github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f h1:7KqmZoEOIXa0UbR2WQ/YPF4H+MPV6rhWk4E4tcv5eDg=
+github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
+github.com/anacrolix/sync v0.3.0 h1:ZPjTrkqQWEfnYVGTQHh5qNjokWaXnjsyXTJSMsKY0TA=
+github.com/anacrolix/sync v0.3.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
 github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
 github.com/anacrolix/tagflag v0.0.0-20180605133421-f477c8c2f14c/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
 github.com/anacrolix/tagflag v0.0.0-20180803105420-3a8ff5428f76/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
@@ -428,6 +432,7 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA
 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
 github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
 github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
 github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y=
 github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
diff --git a/internal/chansync/broadcast-cond.go.go b/internal/chansync/broadcast-cond.go.go
new file mode 100644 (file)
index 0000000..9b89069
--- /dev/null
@@ -0,0 +1,32 @@
+package chansync
+
+import (
+       "github.com/anacrolix/sync"
+)
+
+// Can be used as zero-value. Due to the caller needing to bring their own synchronization, an
+// eqiuvalent to "sync".Cond.Signal is not provided. BroadcastCond is intended to be selected on
+// with other channels.
+type BroadcastCond struct {
+       mu sync.Mutex
+       ch chan struct{}
+}
+
+func (me *BroadcastCond) Broadcast() {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if me.ch != nil {
+               close(me.ch)
+               me.ch = nil
+       }
+}
+
+// Should be called before releasing locks on resources that might trigger subsequent Broadcasts.
+func (me *BroadcastCond) WaitChan() <-chan struct{} {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if me.ch == nil {
+               me.ch = make(chan struct{})
+       }
+       return me.ch
+}
diff --git a/internal/chansync/set-once.go b/internal/chansync/set-once.go
new file mode 100644 (file)
index 0000000..523e5ea
--- /dev/null
@@ -0,0 +1,41 @@
+package chansync
+
+import "sync"
+
+// SetOnce is a boolean value that can only be flipped from false to true.
+type SetOnce struct {
+       ch        chan struct{}
+       initOnce  sync.Once
+       closeOnce sync.Once
+}
+
+func (me *SetOnce) Chan() <-chan struct{} {
+       me.init()
+       return me.ch
+}
+
+func (me *SetOnce) init() {
+       me.initOnce.Do(func() {
+               me.ch = make(chan struct{})
+       })
+}
+
+// Set only returns true the first time it is called.
+func (me *SetOnce) Set() (first bool) {
+       me.closeOnce.Do(func() {
+               me.init()
+               first = true
+               close(me.ch)
+       })
+       return
+}
+
+func (me *SetOnce) IsSet() bool {
+       me.init()
+       select {
+       case <-me.ch:
+               return true
+       default:
+               return false
+       }
+}
index 8e3f5443e464e1197b1dd096b6f89df0f0144a25..9cddf7d77590a4a0a07d995a6a0e750d176aa1e1 100644 (file)
@@ -11,18 +11,18 @@ import (
        "sort"
        "strconv"
        "strings"
-       "sync"
        "time"
 
        "github.com/anacrolix/log"
-       "github.com/anacrolix/missinggo"
        "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/prioritybitmap"
        "github.com/anacrolix/multiless"
-       "github.com/anacrolix/torrent/metainfo"
+       "github.com/anacrolix/sync"
 
        "github.com/anacrolix/torrent/bencode"
+       "github.com/anacrolix/torrent/internal/chansync"
+       "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
@@ -68,7 +68,7 @@ type Peer struct {
        cryptoMethod    mse.CryptoMethod
        Discovery       PeerSource
        trusted         bool
-       closed          missinggo.Event
+       closed          chansync.SetOnce
        // Set true after we've added our ConnStats generated during handshake to
        // other ConnStat instances as determined when the *Torrent became known.
        reconciledHandshakeStats bool
@@ -148,11 +148,10 @@ type PeerConn struct {
        w io.Writer
        r io.Reader
 
-       writeBuffer *bytes.Buffer
-       uploadTimer *time.Timer
-       writerCond  sync.Cond
+       messageWriter peerConnWriter
 
-       pex pexConnState
+       uploadTimer *time.Timer
+       pex         pexConnState
 }
 
 func (cn *PeerConn) connStatusString() string {
@@ -429,17 +428,25 @@ const writeBufferHighWaterLen = 1 << 15
 // done asynchronously, so it may be that we're not able to honour backpressure from this method.
 func (cn *PeerConn) write(msg pp.Message) bool {
        torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
-       // We don't need to track bytes here because a connection.w Writer wrapper takes care of that
-       // (although there's some delay between us recording the message, and the connection writer
+       // We don't need to track bytes here because the connection's Writer has that behaviour injected
+       // (although there's some delay between us buffering the message, and the connection writer
        // flushing it out.).
-       cn.writeBuffer.Write(msg.MustMarshalBinary())
+       notFull := cn.messageWriter.write(msg)
        // Last I checked only Piece messages affect stats, and we don't write those.
        cn.wroteMsg(&msg)
        cn.tickleWriter()
+       return notFull
+}
+
+func (cn *peerConnWriter) write(msg pp.Message) bool {
+       cn.mu.Lock()
+       defer cn.mu.Unlock()
+       cn.writeBuffer.Write(msg.MustMarshalBinary())
+       cn.writeCond.Broadcast()
        return !cn.writeBufferFull()
 }
 
-func (cn *PeerConn) writeBufferFull() bool {
+func (cn *peerConnWriter) writeBufferFull() bool {
        return cn.writeBuffer.Len() >= writeBufferHighWaterLen
 }
 
@@ -636,25 +643,38 @@ func (cn *PeerConn) fillWriteBuffer() {
        cn.upload(cn.write)
 }
 
+type peerConnWriter struct {
+       // Must not be called with the local mutex held, as it will call back into the write method.
+       fillWriteBuffer func()
+       closed          *chansync.SetOnce
+       logger          log.Logger
+       w               io.Writer
+       keepAlive       func() bool
+
+       mu        sync.Mutex
+       writeCond chansync.BroadcastCond
+       // Pointer so we can swap with the "front buffer".
+       writeBuffer *bytes.Buffer
+}
+
 // Routine that writes to the peer. Some of what to write is buffered by
 // activity elsewhere in the Client, and some is determined locally when the
 // connection is writable.
-func (cn *PeerConn) writer(keepAliveTimeout time.Duration) {
+func (cn *peerConnWriter) run(keepAliveTimeout time.Duration) {
        var (
                lastWrite      time.Time = time.Now()
                keepAliveTimer *time.Timer
        )
        keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
-               cn.locker().Lock()
-               defer cn.locker().Unlock()
+               cn.mu.Lock()
+               defer cn.mu.Unlock()
                if time.Since(lastWrite) >= keepAliveTimeout {
-                       cn.tickleWriter()
+                       cn.writeCond.Broadcast()
                }
                keepAliveTimer.Reset(keepAliveTimeout)
        })
-       cn.locker().Lock()
-       defer cn.locker().Unlock()
-       defer cn.close()
+       cn.mu.Lock()
+       defer cn.mu.Unlock()
        defer keepAliveTimer.Stop()
        frontBuf := new(bytes.Buffer)
        for {
@@ -662,22 +682,31 @@ func (cn *PeerConn) writer(keepAliveTimeout time.Duration) {
                        return
                }
                if cn.writeBuffer.Len() == 0 {
-                       cn.fillWriteBuffer()
+                       func() {
+                               cn.mu.Unlock()
+                               defer cn.mu.Lock()
+                               cn.fillWriteBuffer()
+                       }()
                }
-               if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.useful() {
+               if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
                        cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
                        torrent.Add("written keepalives", 1)
                }
                if cn.writeBuffer.Len() == 0 {
-                       // TODO: Minimize wakeups....
-                       cn.writerCond.Wait()
+                       writeCond := cn.writeCond.WaitChan()
+                       cn.mu.Unlock()
+                       select {
+                       case <-cn.closed.Chan():
+                       case <-writeCond:
+                       }
+                       cn.mu.Lock()
                        continue
                }
                // Flip the buffers.
                frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
-               cn.locker().Unlock()
+               cn.mu.Unlock()
                n, err := cn.w.Write(frontBuf.Bytes())
-               cn.locker().Lock()
+               cn.mu.Lock()
                if n != 0 {
                        lastWrite = time.Now()
                        keepAliveTimer.Reset(keepAliveTimeout)
@@ -1463,7 +1492,7 @@ func (c *PeerConn) uploadAllowed() bool {
 
 func (c *PeerConn) setRetryUploadTimer(delay time.Duration) {
        if c.uploadTimer == nil {
-               c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
+               c.uploadTimer = time.AfterFunc(delay, c.tickleWriter)
        } else {
                c.uploadTimer.Reset(delay)
        }
@@ -1558,7 +1587,7 @@ func (c *Peer) deleteAllRequests() {
 // This is called when something has changed that should wake the writer, such as putting stuff into
 // the writeBuffer, or changing some state that the writer can act on.
 func (c *PeerConn) tickleWriter() {
-       c.writerCond.Broadcast()
+       c.messageWriter.writeCond.Broadcast()
 }
 
 func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
index 18fc98ef52925b91147bab7114851a3113cf3205..c28bc6328bb0244d8a17846ae5bac1c782bdcd51 100644 (file)
@@ -5,7 +5,6 @@ import (
        "net"
        "sync"
        "testing"
-       "time"
 
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/bradfitz/iter"
@@ -32,7 +31,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
        r, w := io.Pipe()
        //c.r = r
        c.w = w
-       go c.writer(time.Minute)
+       c.startWriter()
        c.locker().Lock()
        c.t._completedPieces.Add(1)
        c.postBitfield( /*[]bool{false, true, false}*/ )
index a3ff1d02dbbe3b36ea772b18183f96a94abe2b82..df9aa44ed670d45c362853ca93def22d2e19454a 100644 (file)
@@ -21,7 +21,7 @@ func TestPexConnState(t *testing.T) {
        c := cl.newConnection(nil, false, addr, addr.Network(), "")
        c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber)
        c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId
-       c.writerCond.L.Lock()
+       c.messageWriter.mu.Lock()
        c.setTorrent(torrent)
        torrent.addPeerConn(c)
 
@@ -36,7 +36,7 @@ func TestPexConnState(t *testing.T) {
                out = m
                return true
        }
-       c.writerCond.Wait()
+       <-c.messageWriter.writeCond.WaitChan()
        c.pex.Share(testWriter)
        require.True(t, writerCalled)
        require.EqualValues(t, pp.Extended, out.Type)