]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix unique defers for unary functions
authorMatt Joiner <anacrolix@gmail.com>
Tue, 3 Jun 2025 11:03:12 +0000 (21:03 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 3 Jun 2025 11:03:12 +0000 (21:03 +1000)
deferrwl.go
deferrwl_test.go [new file with mode: 0644]
peer.go
piece.go
torrent.go

index 61292f5771bad7d9881739f53355fd9640444b68..0af2e19675b2e67a3c579cb742c93be7e939d9e9 100644 (file)
@@ -14,7 +14,7 @@ import (
 type lockWithDeferreds struct {
        internal      sync.RWMutex
        unlockActions []func()
-       m             map[uintptr]struct{}
+       uniqueActions map[any]struct{}
        // Currently unlocking, defers should not occur?
        unlocking bool
 }
@@ -24,18 +24,19 @@ func (me *lockWithDeferreds) Lock() {
 }
 
 func (me *lockWithDeferreds) Unlock() {
-       defer me.internal.Unlock()
        me.unlocking = true
        startLen := len(me.unlockActions)
-       for i := range startLen {
+       var i int
+       for i = 0; i < len(me.unlockActions); i++ {
                me.unlockActions[i]()
        }
-       if len(me.unlockActions) != startLen {
+       if i != len(me.unlockActions) {
                panic(fmt.Sprintf("num deferred changed while running: %v -> %v", startLen, len(me.unlockActions)))
        }
        me.unlockActions = me.unlockActions[:0]
-       clear(me.unlockActions)
+       clear(me.uniqueActions)
        me.unlocking = false
+       me.internal.Unlock()
 }
 
 func (me *lockWithDeferreds) RLock() {
@@ -46,19 +47,43 @@ func (me *lockWithDeferreds) RUnlock() {
        me.internal.RUnlock()
 }
 
+// Not allowed after unlock has started.
 func (me *lockWithDeferreds) Defer(action func()) {
        if me.unlocking {
                panic("defer called while unlocking")
        }
+       me.deferInner(action)
+}
+
+// Already guarded.
+func (me *lockWithDeferreds) deferInner(action func()) {
        me.unlockActions = append(me.unlockActions, action)
 }
 
-func (me *lockWithDeferreds) DeferOnce(action func()) {
-       g.MakeMapIfNil(&me.m)
-       key := reflect.ValueOf(action).Pointer()
-       if g.MapContains(me.m, key) {
+// Protected from looping by once filter.
+func (me *lockWithDeferreds) deferOnceInner(key any, action func()) {
+       g.MakeMapIfNil(&me.uniqueActions)
+       if g.MapContains(me.uniqueActions, key) {
                return
        }
-       me.m[key] = struct{}{}
-       me.Defer(action)
+       me.uniqueActions[key] = struct{}{}
+       me.deferInner(action)
+}
+
+// Protected from looping by once filter. Note that if arg is the receiver of action, it should
+// match the receiver type (like being a pointer if the method takes a pointer receiver).
+func (me *lockWithDeferreds) DeferUniqueUnaryFunc(arg any, action func()) {
+       me.deferOnceInner(unaryFuncKey(action, arg), action)
+}
+
+func unaryFuncKey(f func(), key any) funcAndArgKey {
+       return funcAndArgKey{
+               funcStr: reflect.ValueOf(f).String(),
+               key:     key,
+       }
+}
+
+type funcAndArgKey struct {
+       funcStr string
+       key     any
 }
diff --git a/deferrwl_test.go b/deferrwl_test.go
new file mode 100644 (file)
index 0000000..8d08ede
--- /dev/null
@@ -0,0 +1,17 @@
+package torrent
+
+import (
+       "testing"
+
+       "github.com/go-quicktest/qt"
+)
+
+func TestUniqueDeferOnce(t *testing.T) {
+       var p1, p2 Piece
+       var mu lockWithDeferreds
+       mu.DeferUniqueUnaryFunc(&p1, p1.publishStateChange)
+       mu.DeferUniqueUnaryFunc(&p1, p1.publishStateChange)
+       qt.Assert(t, qt.HasLen(mu.unlockActions, 1))
+       mu.DeferUniqueUnaryFunc(&p2, p2.publishStateChange)
+       qt.Assert(t, qt.HasLen(mu.unlockActions, 2))
+}
diff --git a/peer.go b/peer.go
index a7e94ae7669cca7e3797b9a754d0af9641a0f0f2..fb66d631d9eba75a9f987dda86bedc76ad6ad605 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -512,7 +512,7 @@ func (cn *Peer) onNeedUpdateRequests(reason updateRequestReason) {
        }
        cn.needRequestUpdate = reason
        // Run this before the Client lock is released.
-       cn.locker().Defer(cn.handleOnNeedUpdateRequests)
+       cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests)
 }
 
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
@@ -754,7 +754,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 
        cl.event.Broadcast()
        // We do this because we've written a chunk, and may change PieceState.Partial.
-       t.publishPieceStateChange(pieceIndex(ppReq.Index))
+       t.deferPublishPieceStateChange(pieceIndex(ppReq.Index))
 
        return nil
 }
index b7c78be15b149b14a2f3008af664432899d9dc93..37bb588bc396bc0cb9a4d53977bdad5ee9effe49 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -410,3 +410,16 @@ func (p *Piece) nextNovelHashCount() (ret pieceVerifyCount) {
        }
        return
 }
+
+// Here so it's zero-arity and we can use it in DeferOnce.
+func (p *Piece) publishStateChange() {
+       t := p.t
+       cur := t.pieceState(p.index)
+       if cur != p.publicPieceState {
+               p.publicPieceState = cur
+               t.pieceStateChanges.Publish(PieceStateChange{
+                       p.index,
+                       cur,
+               })
+       }
+}
index 09a9b81ba9d6ce2ec0d494ac75b8eb05bffbac11..dc9571bac119130af4f3afba1b1a94d64c03b432 100644 (file)
@@ -1406,18 +1406,9 @@ type PieceStateChange struct {
        PieceState
 }
 
-func (t *Torrent) publishPieceStateChange(piece pieceIndex) {
-       t.cl._mu.Defer(func() {
-               cur := t.pieceState(piece)
-               p := &t.pieces[piece]
-               if cur != p.publicPieceState {
-                       p.publicPieceState = cur
-                       t.pieceStateChanges.Publish(PieceStateChange{
-                               piece,
-                               cur,
-                       })
-               }
-       })
+func (t *Torrent) deferPublishPieceStateChange(piece pieceIndex) {
+       p := t.piece(piece)
+       t.cl.locker().DeferUniqueUnaryFunc(p, p.publishStateChange)
 }
 
 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
@@ -1491,7 +1482,7 @@ func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequ
 // Stuff we don't want to run when the pending pieces change while benchmarking.
 func (t *Torrent) onPiecePendingTriggers(piece pieceIndex) {
        t.maybeNewConns()
-       t.publishPieceStateChange(piece)
+       t.deferPublishPieceStateChange(piece)
 }
 
 // Pending pieces is an old bitmap of stuff we want. I think it's more nuanced than that now with
@@ -2488,10 +2479,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
        }
 
        p.marking = true
-       t.publishPieceStateChange(piece)
+       t.deferPublishPieceStateChange(piece)
        defer func() {
                p.marking = false
-               t.publishPieceStateChange(piece)
+               t.deferPublishPieceStateChange(piece)
        }()
 
        if passed {
@@ -2678,7 +2669,7 @@ func (t *Torrent) startHash(pi pieceIndex) {
        t.piecesQueuedForHash.Remove(pi)
        t.deferUpdateComplete()
        p.hashing = true
-       t.publishPieceStateChange(pi)
+       t.deferPublishPieceStateChange(pi)
        t.updatePiecePriority(pi, "Torrent.startPieceHasher")
        t.storageLock.RLock()
        t.activePieceHashes++
@@ -2792,7 +2783,7 @@ func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVe
        }
        t.piecesQueuedForHash.Add(pieceIndex)
        t.deferUpdateComplete()
-       t.publishPieceStateChange(pieceIndex)
+       t.deferPublishPieceStateChange(pieceIndex)
        t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
        err = t.startPieceHashers()
        return
@@ -3114,7 +3105,7 @@ func (t *Torrent) pieceRequestIndexBegin(piece pieceIndex) RequestIndex {
 
 // Run complete validation when lock is released.
 func (t *Torrent) deferUpdateComplete() {
-       t.cl._mu.DeferOnce(t.updateComplete)
+       t.cl._mu.DeferUniqueUnaryFunc(t, t.updateComplete)
 }
 
 func (t *Torrent) updateComplete() {