Reported by Craig Campbell <iamcraigcampbell@gmail.com>.
// 64-bit alignment of fields. See #262.
stats ConnStats
- _mu sync.RWMutex
+ _mu lockWithDeferreds
event sync.Cond
closed missinggo.Event
"fmt"
"io"
"io/ioutil"
+ "log"
"os"
"path/filepath"
"reflect"
LeecherDownloadRateLimiter *rate.Limiter
}
+func logPieceStateChanges(t *Torrent) {
+ sub := t.SubscribePieceStateChanges()
+ go func() {
+ defer sub.Close()
+ for e := range sub.Values {
+ log.Printf("%p %#v", t, e)
+ }
+ }()
+}
+
// Creates a seeder and a leecher, and ensures the data transfers when a read
// is attempted on the leecher.
func testClientTransfer(t *testing.T, ps testClientTransferParams) {
}())
require.NoError(t, err)
assert.True(t, new)
+
+ //// This was used when observing coalescing of piece state changes.
+ //logPieceStateChanges(leecherTorrent)
+
// Now do some things with leecher and seeder.
leecherTorrent.AddClientPeer(seeder)
// The Torrent should not be interested in obtaining peers, so the one we
--- /dev/null
+package torrent
+
+import "sync"
+
+// Runs deferred actions on Unlock. Note that actions are assumed to be the results of changes that
+// would only occur with a write lock at present. The race detector should catch instances of defers
+// without the write lock being held.
+type lockWithDeferreds struct {
+ internal sync.RWMutex
+ unlockActions []func()
+}
+
+func (me *lockWithDeferreds) Lock() {
+ me.internal.Lock()
+}
+
+func (me *lockWithDeferreds) Unlock() {
+ for _, a := range me.unlockActions {
+ a()
+ }
+ me.unlockActions = me.unlockActions[:0]
+ me.internal.Unlock()
+}
+
+func (me *lockWithDeferreds) RLock() {
+ me.internal.RLock()
+}
+
+func (me *lockWithDeferreds) RUnlock() {
+ me.internal.RUnlock()
+}
+
+func (me *lockWithDeferreds) Defer(action func()) {
+ me.unlockActions = append(me.unlockActions, action)
+}
}
func (t *Torrent) publishPieceChange(piece pieceIndex) {
- cur := t.pieceState(piece)
- p := &t.pieces[piece]
- if cur != p.publicPieceState {
- p.publicPieceState = cur
- t.pieceStateChanges.Publish(PieceStateChange{
- int(piece),
- cur,
- })
- }
+ t.cl._mu.Defer(func() {
+ cur := t.pieceState(piece)
+ p := &t.pieces[piece]
+ if cur != p.publicPieceState {
+ p.publicPieceState = cur
+ t.pieceStateChanges.Publish(PieceStateChange{
+ int(piece),
+ cur,
+ })
+ }
+ })
}
func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
return
}
-// Currently doesn't really queue, but should in the future.
func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
piece := t.piece(pieceIndex)
if piece.queuedForHash() {