]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Coalesce piece state change notifications on client unlock v1.10.0
authorMatt Joiner <anacrolix@gmail.com>
Fri, 13 Dec 2019 04:55:56 +0000 (15:55 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 13 Dec 2019 04:55:56 +0000 (15:55 +1100)
Reported by Craig Campbell <iamcraigcampbell@gmail.com>.

client.go
client_test.go
deferrwl.go [new file with mode: 0644]
torrent.go

index 78f9bb3db5deb354d729ff11094915d645e44c1a..05e9fdfd6d0a4a2ed1e059c08cdad475b2053e4e 100644 (file)
--- a/client.go
+++ b/client.go
@@ -48,7 +48,7 @@ type Client struct {
        // 64-bit alignment of fields. See #262.
        stats ConnStats
 
-       _mu    sync.RWMutex
+       _mu    lockWithDeferreds
        event  sync.Cond
        closed missinggo.Event
 
index 5878c3ccd769e59e6509896e1abf2c519ce93ca9..0ad01eecdc592659139c71fe4006f6a91045b58c 100644 (file)
@@ -5,6 +5,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "os"
        "path/filepath"
        "reflect"
@@ -288,6 +289,16 @@ type testClientTransferParams struct {
        LeecherDownloadRateLimiter *rate.Limiter
 }
 
+func logPieceStateChanges(t *Torrent) {
+       sub := t.SubscribePieceStateChanges()
+       go func() {
+               defer sub.Close()
+               for e := range sub.Values {
+                       log.Printf("%p %#v", t, e)
+               }
+       }()
+}
+
 // Creates a seeder and a leecher, and ensures the data transfers when a read
 // is attempted on the leecher.
 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
@@ -344,6 +355,10 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        }())
        require.NoError(t, err)
        assert.True(t, new)
+
+       //// This was used when observing coalescing of piece state changes.
+       //logPieceStateChanges(leecherTorrent)
+
        // Now do some things with leecher and seeder.
        leecherTorrent.AddClientPeer(seeder)
        // The Torrent should not be interested in obtaining peers, so the one we
diff --git a/deferrwl.go b/deferrwl.go
new file mode 100644 (file)
index 0000000..3b2ad7a
--- /dev/null
@@ -0,0 +1,35 @@
+package torrent
+
+import "sync"
+
+// Runs deferred actions on Unlock. Note that actions are assumed to be the results of changes that
+// would only occur with a write lock at present. The race detector should catch instances of defers
+// without the write lock being held.
+type lockWithDeferreds struct {
+       internal      sync.RWMutex
+       unlockActions []func()
+}
+
+func (me *lockWithDeferreds) Lock() {
+       me.internal.Lock()
+}
+
+func (me *lockWithDeferreds) Unlock() {
+       for _, a := range me.unlockActions {
+               a()
+       }
+       me.unlockActions = me.unlockActions[:0]
+       me.internal.Unlock()
+}
+
+func (me *lockWithDeferreds) RLock() {
+       me.internal.RLock()
+}
+
+func (me *lockWithDeferreds) RUnlock() {
+       me.internal.RUnlock()
+}
+
+func (me *lockWithDeferreds) Defer(action func()) {
+       me.unlockActions = append(me.unlockActions, action)
+}
index 50e56eced5fd76e7fa09f86461ce50e9c6d1cab7..d0e17a20d05c7bdc9c5bb69e51b487e5f0e68881 100644 (file)
@@ -849,15 +849,17 @@ type PieceStateChange struct {
 }
 
 func (t *Torrent) publishPieceChange(piece pieceIndex) {
-       cur := t.pieceState(piece)
-       p := &t.pieces[piece]
-       if cur != p.publicPieceState {
-               p.publicPieceState = cur
-               t.pieceStateChanges.Publish(PieceStateChange{
-                       int(piece),
-                       cur,
-               })
-       }
+       t.cl._mu.Defer(func() {
+               cur := t.pieceState(piece)
+               p := &t.pieces[piece]
+               if cur != p.publicPieceState {
+                       p.publicPieceState = cur
+                       t.pieceStateChanges.Publish(PieceStateChange{
+                               int(piece),
+                               cur,
+                       })
+               }
+       })
 }
 
 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
@@ -1682,7 +1684,6 @@ func (t *Torrent) connsAsSlice() (ret []*connection) {
        return
 }
 
-// Currently doesn't really queue, but should in the future.
 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
        piece := t.piece(pieceIndex)
        if piece.queuedForHash() {