type lockWithDeferreds struct {
internal sync.RWMutex
unlockActions []func()
- m map[uintptr]struct{}
+ uniqueActions map[any]struct{}
// Currently unlocking, defers should not occur?
unlocking bool
}
}
func (me *lockWithDeferreds) Unlock() {
- defer me.internal.Unlock()
me.unlocking = true
startLen := len(me.unlockActions)
- for i := range startLen {
+ var i int
+ for i = 0; i < len(me.unlockActions); i++ {
me.unlockActions[i]()
}
- if len(me.unlockActions) != startLen {
+ if i != len(me.unlockActions) {
panic(fmt.Sprintf("num deferred changed while running: %v -> %v", startLen, len(me.unlockActions)))
}
me.unlockActions = me.unlockActions[:0]
- clear(me.unlockActions)
+ clear(me.uniqueActions)
me.unlocking = false
+ me.internal.Unlock()
}
func (me *lockWithDeferreds) RLock() {
me.internal.RUnlock()
}
+// Not allowed after unlock has started.
func (me *lockWithDeferreds) Defer(action func()) {
if me.unlocking {
panic("defer called while unlocking")
}
+ me.deferInner(action)
+}
+
+// Already guarded.
+func (me *lockWithDeferreds) deferInner(action func()) {
me.unlockActions = append(me.unlockActions, action)
}
-func (me *lockWithDeferreds) DeferOnce(action func()) {
- g.MakeMapIfNil(&me.m)
- key := reflect.ValueOf(action).Pointer()
- if g.MapContains(me.m, key) {
+// Protected from looping by once filter.
+func (me *lockWithDeferreds) deferOnceInner(key any, action func()) {
+ g.MakeMapIfNil(&me.uniqueActions)
+ if g.MapContains(me.uniqueActions, key) {
return
}
- me.m[key] = struct{}{}
- me.Defer(action)
+ me.uniqueActions[key] = struct{}{}
+ me.deferInner(action)
+}
+
+// Protected from looping by once filter. Note that if arg is the receiver of action, it should
+// match the receiver type (like being a pointer if the method takes a pointer receiver).
+func (me *lockWithDeferreds) DeferUniqueUnaryFunc(arg any, action func()) {
+ me.deferOnceInner(unaryFuncKey(action, arg), action)
+}
+
+func unaryFuncKey(f func(), key any) funcAndArgKey {
+ return funcAndArgKey{
+ funcStr: reflect.ValueOf(f).String(),
+ key: key,
+ }
+}
+
+type funcAndArgKey struct {
+ funcStr string
+ key any
}
--- /dev/null
+package torrent
+
+import (
+ "testing"
+
+ "github.com/go-quicktest/qt"
+)
+
+func TestUniqueDeferOnce(t *testing.T) {
+ var p1, p2 Piece
+ var mu lockWithDeferreds
+ mu.DeferUniqueUnaryFunc(&p1, p1.publishStateChange)
+ mu.DeferUniqueUnaryFunc(&p1, p1.publishStateChange)
+ qt.Assert(t, qt.HasLen(mu.unlockActions, 1))
+ mu.DeferUniqueUnaryFunc(&p2, p2.publishStateChange)
+ qt.Assert(t, qt.HasLen(mu.unlockActions, 2))
+}
}
cn.needRequestUpdate = reason
// Run this before the Client lock is released.
- cn.locker().Defer(cn.handleOnNeedUpdateRequests)
+ cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests)
}
// Emits the indices in the Bitmaps bms in order, never repeating any index.
cl.event.Broadcast()
// We do this because we've written a chunk, and may change PieceState.Partial.
- t.publishPieceStateChange(pieceIndex(ppReq.Index))
+ t.deferPublishPieceStateChange(pieceIndex(ppReq.Index))
return nil
}
}
return
}
+
+// Here so it's zero-arity and we can use it in DeferOnce.
+func (p *Piece) publishStateChange() {
+ t := p.t
+ cur := t.pieceState(p.index)
+ if cur != p.publicPieceState {
+ p.publicPieceState = cur
+ t.pieceStateChanges.Publish(PieceStateChange{
+ p.index,
+ cur,
+ })
+ }
+}
PieceState
}
-func (t *Torrent) publishPieceStateChange(piece pieceIndex) {
- t.cl._mu.Defer(func() {
- cur := t.pieceState(piece)
- p := &t.pieces[piece]
- if cur != p.publicPieceState {
- p.publicPieceState = cur
- t.pieceStateChanges.Publish(PieceStateChange{
- piece,
- cur,
- })
- }
- })
+func (t *Torrent) deferPublishPieceStateChange(piece pieceIndex) {
+ p := t.piece(piece)
+ t.cl.locker().DeferUniqueUnaryFunc(p, p.publishStateChange)
}
func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
// Stuff we don't want to run when the pending pieces change while benchmarking.
func (t *Torrent) onPiecePendingTriggers(piece pieceIndex) {
t.maybeNewConns()
- t.publishPieceStateChange(piece)
+ t.deferPublishPieceStateChange(piece)
}
// Pending pieces is an old bitmap of stuff we want. I think it's more nuanced than that now with
}
p.marking = true
- t.publishPieceStateChange(piece)
+ t.deferPublishPieceStateChange(piece)
defer func() {
p.marking = false
- t.publishPieceStateChange(piece)
+ t.deferPublishPieceStateChange(piece)
}()
if passed {
t.piecesQueuedForHash.Remove(pi)
t.deferUpdateComplete()
p.hashing = true
- t.publishPieceStateChange(pi)
+ t.deferPublishPieceStateChange(pi)
t.updatePiecePriority(pi, "Torrent.startPieceHasher")
t.storageLock.RLock()
t.activePieceHashes++
}
t.piecesQueuedForHash.Add(pieceIndex)
t.deferUpdateComplete()
- t.publishPieceStateChange(pieceIndex)
+ t.deferPublishPieceStateChange(pieceIndex)
t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
err = t.startPieceHashers()
return
// Run complete validation when lock is released.
func (t *Torrent) deferUpdateComplete() {
- t.cl._mu.DeferOnce(t.updateComplete)
+ t.cl._mu.DeferUniqueUnaryFunc(t, t.updateComplete)
}
func (t *Torrent) updateComplete() {