From: Matt Joiner Date: Fri, 13 Dec 2019 04:55:56 +0000 (+1100) Subject: Coalesce piece state change notifications on client unlock X-Git-Tag: v1.10.0^0 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=f448f55e88ced8652c09a66201e5579fcb7054dc;p=btrtrc.git Coalesce piece state change notifications on client unlock Reported by Craig Campbell . --- diff --git a/client.go b/client.go index 78f9bb3d..05e9fdfd 100644 --- a/client.go +++ b/client.go @@ -48,7 +48,7 @@ type Client struct { // 64-bit alignment of fields. See #262. stats ConnStats - _mu sync.RWMutex + _mu lockWithDeferreds event sync.Cond closed missinggo.Event diff --git a/client_test.go b/client_test.go index 5878c3cc..0ad01eec 100644 --- a/client_test.go +++ b/client_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "io/ioutil" + "log" "os" "path/filepath" "reflect" @@ -288,6 +289,16 @@ type testClientTransferParams struct { LeecherDownloadRateLimiter *rate.Limiter } +func logPieceStateChanges(t *Torrent) { + sub := t.SubscribePieceStateChanges() + go func() { + defer sub.Close() + for e := range sub.Values { + log.Printf("%p %#v", t, e) + } + }() +} + // Creates a seeder and a leecher, and ensures the data transfers when a read // is attempted on the leecher. func testClientTransfer(t *testing.T, ps testClientTransferParams) { @@ -344,6 +355,10 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { }()) require.NoError(t, err) assert.True(t, new) + + //// This was used when observing coalescing of piece state changes. + //logPieceStateChanges(leecherTorrent) + // Now do some things with leecher and seeder. leecherTorrent.AddClientPeer(seeder) // The Torrent should not be interested in obtaining peers, so the one we diff --git a/deferrwl.go b/deferrwl.go new file mode 100644 index 00000000..3b2ad7a3 --- /dev/null +++ b/deferrwl.go @@ -0,0 +1,35 @@ +package torrent + +import "sync" + +// Runs deferred actions on Unlock. Note that actions are assumed to be the results of changes that +// would only occur with a write lock at present. The race detector should catch instances of defers +// without the write lock being held. +type lockWithDeferreds struct { + internal sync.RWMutex + unlockActions []func() +} + +func (me *lockWithDeferreds) Lock() { + me.internal.Lock() +} + +func (me *lockWithDeferreds) Unlock() { + for _, a := range me.unlockActions { + a() + } + me.unlockActions = me.unlockActions[:0] + me.internal.Unlock() +} + +func (me *lockWithDeferreds) RLock() { + me.internal.RLock() +} + +func (me *lockWithDeferreds) RUnlock() { + me.internal.RUnlock() +} + +func (me *lockWithDeferreds) Defer(action func()) { + me.unlockActions = append(me.unlockActions, action) +} diff --git a/torrent.go b/torrent.go index 50e56ece..d0e17a20 100644 --- a/torrent.go +++ b/torrent.go @@ -849,15 +849,17 @@ type PieceStateChange struct { } func (t *Torrent) publishPieceChange(piece pieceIndex) { - cur := t.pieceState(piece) - p := &t.pieces[piece] - if cur != p.publicPieceState { - p.publicPieceState = cur - t.pieceStateChanges.Publish(PieceStateChange{ - int(piece), - cur, - }) - } + t.cl._mu.Defer(func() { + cur := t.pieceState(piece) + p := &t.pieces[piece] + if cur != p.publicPieceState { + p.publicPieceState = cur + t.pieceStateChanges.Publish(PieceStateChange{ + int(piece), + cur, + }) + } + }) } func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer { @@ -1682,7 +1684,6 @@ func (t *Torrent) connsAsSlice() (ret []*connection) { return } -// Currently doesn't really queue, but should in the future. func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) { piece := t.piece(pieceIndex) if piece.queuedForHash() {