From: Matt Joiner Date: Tue, 3 Jun 2025 11:03:12 +0000 (+1000) Subject: Fix unique defers for unary functions X-Git-Tag: v1.59.0~83 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=4c22312e90d5e4138f41382fe61dcb4260e85b81;p=btrtrc.git Fix unique defers for unary functions --- diff --git a/deferrwl.go b/deferrwl.go index 61292f57..0af2e196 100644 --- a/deferrwl.go +++ b/deferrwl.go @@ -14,7 +14,7 @@ import ( type lockWithDeferreds struct { internal sync.RWMutex unlockActions []func() - m map[uintptr]struct{} + uniqueActions map[any]struct{} // Currently unlocking, defers should not occur? unlocking bool } @@ -24,18 +24,19 @@ func (me *lockWithDeferreds) Lock() { } func (me *lockWithDeferreds) Unlock() { - defer me.internal.Unlock() me.unlocking = true startLen := len(me.unlockActions) - for i := range startLen { + var i int + for i = 0; i < len(me.unlockActions); i++ { me.unlockActions[i]() } - if len(me.unlockActions) != startLen { + if i != len(me.unlockActions) { panic(fmt.Sprintf("num deferred changed while running: %v -> %v", startLen, len(me.unlockActions))) } me.unlockActions = me.unlockActions[:0] - clear(me.unlockActions) + clear(me.uniqueActions) me.unlocking = false + me.internal.Unlock() } func (me *lockWithDeferreds) RLock() { @@ -46,19 +47,43 @@ func (me *lockWithDeferreds) RUnlock() { me.internal.RUnlock() } +// Not allowed after unlock has started. func (me *lockWithDeferreds) Defer(action func()) { if me.unlocking { panic("defer called while unlocking") } + me.deferInner(action) +} + +// Already guarded. +func (me *lockWithDeferreds) deferInner(action func()) { me.unlockActions = append(me.unlockActions, action) } -func (me *lockWithDeferreds) DeferOnce(action func()) { - g.MakeMapIfNil(&me.m) - key := reflect.ValueOf(action).Pointer() - if g.MapContains(me.m, key) { +// Protected from looping by once filter. +func (me *lockWithDeferreds) deferOnceInner(key any, action func()) { + g.MakeMapIfNil(&me.uniqueActions) + if g.MapContains(me.uniqueActions, key) { return } - me.m[key] = struct{}{} - me.Defer(action) + me.uniqueActions[key] = struct{}{} + me.deferInner(action) +} + +// Protected from looping by once filter. Note that if arg is the receiver of action, it should +// match the receiver type (like being a pointer if the method takes a pointer receiver). +func (me *lockWithDeferreds) DeferUniqueUnaryFunc(arg any, action func()) { + me.deferOnceInner(unaryFuncKey(action, arg), action) +} + +func unaryFuncKey(f func(), key any) funcAndArgKey { + return funcAndArgKey{ + funcStr: reflect.ValueOf(f).String(), + key: key, + } +} + +type funcAndArgKey struct { + funcStr string + key any } diff --git a/deferrwl_test.go b/deferrwl_test.go new file mode 100644 index 00000000..8d08ede6 --- /dev/null +++ b/deferrwl_test.go @@ -0,0 +1,17 @@ +package torrent + +import ( + "testing" + + "github.com/go-quicktest/qt" +) + +func TestUniqueDeferOnce(t *testing.T) { + var p1, p2 Piece + var mu lockWithDeferreds + mu.DeferUniqueUnaryFunc(&p1, p1.publishStateChange) + mu.DeferUniqueUnaryFunc(&p1, p1.publishStateChange) + qt.Assert(t, qt.HasLen(mu.unlockActions, 1)) + mu.DeferUniqueUnaryFunc(&p2, p2.publishStateChange) + qt.Assert(t, qt.HasLen(mu.unlockActions, 2)) +} diff --git a/peer.go b/peer.go index a7e94ae7..fb66d631 100644 --- a/peer.go +++ b/peer.go @@ -512,7 +512,7 @@ func (cn *Peer) onNeedUpdateRequests(reason updateRequestReason) { } cn.needRequestUpdate = reason // Run this before the Client lock is released. - cn.locker().Defer(cn.handleOnNeedUpdateRequests) + cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests) } // Emits the indices in the Bitmaps bms in order, never repeating any index. @@ -754,7 +754,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { cl.event.Broadcast() // We do this because we've written a chunk, and may change PieceState.Partial. - t.publishPieceStateChange(pieceIndex(ppReq.Index)) + t.deferPublishPieceStateChange(pieceIndex(ppReq.Index)) return nil } diff --git a/piece.go b/piece.go index b7c78be1..37bb588b 100644 --- a/piece.go +++ b/piece.go @@ -410,3 +410,16 @@ func (p *Piece) nextNovelHashCount() (ret pieceVerifyCount) { } return } + +// Here so it's zero-arity and we can use it in DeferOnce. +func (p *Piece) publishStateChange() { + t := p.t + cur := t.pieceState(p.index) + if cur != p.publicPieceState { + p.publicPieceState = cur + t.pieceStateChanges.Publish(PieceStateChange{ + p.index, + cur, + }) + } +} diff --git a/torrent.go b/torrent.go index 09a9b81b..dc9571ba 100644 --- a/torrent.go +++ b/torrent.go @@ -1406,18 +1406,9 @@ type PieceStateChange struct { PieceState } -func (t *Torrent) publishPieceStateChange(piece pieceIndex) { - t.cl._mu.Defer(func() { - cur := t.pieceState(piece) - p := &t.pieces[piece] - if cur != p.publicPieceState { - p.publicPieceState = cur - t.pieceStateChanges.Publish(PieceStateChange{ - piece, - cur, - }) - } - }) +func (t *Torrent) deferPublishPieceStateChange(piece pieceIndex) { + p := t.piece(piece) + t.cl.locker().DeferUniqueUnaryFunc(p, p.publishStateChange) } func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer { @@ -1491,7 +1482,7 @@ func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequ // Stuff we don't want to run when the pending pieces change while benchmarking. func (t *Torrent) onPiecePendingTriggers(piece pieceIndex) { t.maybeNewConns() - t.publishPieceStateChange(piece) + t.deferPublishPieceStateChange(piece) } // Pending pieces is an old bitmap of stuff we want. I think it's more nuanced than that now with @@ -2488,10 +2479,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } p.marking = true - t.publishPieceStateChange(piece) + t.deferPublishPieceStateChange(piece) defer func() { p.marking = false - t.publishPieceStateChange(piece) + t.deferPublishPieceStateChange(piece) }() if passed { @@ -2678,7 +2669,7 @@ func (t *Torrent) startHash(pi pieceIndex) { t.piecesQueuedForHash.Remove(pi) t.deferUpdateComplete() p.hashing = true - t.publishPieceStateChange(pi) + t.deferPublishPieceStateChange(pi) t.updatePiecePriority(pi, "Torrent.startPieceHasher") t.storageLock.RLock() t.activePieceHashes++ @@ -2792,7 +2783,7 @@ func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVe } t.piecesQueuedForHash.Add(pieceIndex) t.deferUpdateComplete() - t.publishPieceStateChange(pieceIndex) + t.deferPublishPieceStateChange(pieceIndex) t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck") err = t.startPieceHashers() return @@ -3114,7 +3105,7 @@ func (t *Torrent) pieceRequestIndexBegin(piece pieceIndex) RequestIndex { // Run complete validation when lock is released. func (t *Torrent) deferUpdateComplete() { - t.cl._mu.DeferOnce(t.updateComplete) + t.cl._mu.DeferUniqueUnaryFunc(t, t.updateComplete) } func (t *Torrent) updateComplete() {