From 5aa05565d88a34c8b489e346ab559a90a55a7fe5 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 21 May 2025 15:22:33 +1000 Subject: [PATCH] Add IgnoreUnverifiedPieceCompletion and move fields to AddTorrentOpts --- client.go | 32 ++++++++++++---------- client_test.go | 20 ++++++-------- cmd/torrent/serve.go | 22 ++++++--------- deferrwl.go | 14 ++++++++++ file.go | 24 +++++++++++++++- issue211_test.go | 3 +- peer.go | 5 ++-- piece.go | 29 ++++++++++++++++---- requesting.go | 5 ++-- spec.go | 28 ++++++------------- storage/interface.go | 2 +- test/issue377_test.go | 6 ++-- test/sqlite_test.go | 7 +++-- torrent.go | 64 ++++++++++++++++++++++++++++++------------- torrent_test.go | 2 ++ webseed-peer.go | 2 +- webseed/client.go | 1 - 17 files changed, 163 insertions(+), 103 deletions(-) diff --git a/client.go b/client.go index ef830627..1405b389 100644 --- a/client.go +++ b/client.go @@ -23,20 +23,18 @@ import ( "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" - "github.com/cespare/xxhash" - "github.com/davecgh/go-spew/spew" - "github.com/dustin/go-humanize" - gbtree "github.com/google/btree" - "github.com/pion/webrtc/v4" - - "github.com/anacrolix/log" - . "github.com/anacrolix/generics" g "github.com/anacrolix/generics" + "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" + "github.com/cespare/xxhash" + "github.com/davecgh/go-spew/spew" + "github.com/dustin/go-humanize" + gbtree "github.com/google/btree" + "github.com/pion/webrtc/v4" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/check" @@ -1386,6 +1384,9 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { L: cl.locker(), }, gotMetainfoC: make(chan struct{}), + + ignoreUnverifiedPieceCompletion: opts.IgnoreUnverifiedPieceCompletion, + initialPieceCheckDisabled: opts.DisableInitialPieceCheck, } g.MakeMap(&t.webSeeds) t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background()) @@ -1493,17 +1494,18 @@ type AddTorrentOpts struct { Storage storage.ClientImpl ChunkSize pp.Integer InfoBytes []byte + // Don't hash data if piece completion is missing. This is useful for very large torrents that + // are dropped in place from an external source and trigger a lot of initial piece checks. + DisableInitialPieceCheck bool + // Require pieces to be checked as soon as info is available. This is because we have no way to + // schedule an initial check only, and don't want to race against use of Torrent.Complete. + IgnoreUnverifiedPieceCompletion bool } // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also // Torrent.MergeSpec. func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) { - t, new = cl.AddTorrentOpt(AddTorrentOpts{ - InfoHash: spec.InfoHash, - InfoHashV2: spec.InfoHashV2, - Storage: spec.Storage, - ChunkSize: spec.ChunkSize, - }) + t, new = cl.AddTorrentOpt(spec.AddTorrentOpts) modSpec := *spec if new { // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing @@ -1533,9 +1535,9 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { cl := t.cl cl.AddDhtNodes(spec.DhtNodes) t.UseSources(spec.Sources) + // TODO: The lock should be moved earlier. cl.lock() defer cl.unlock() - t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck for _, url := range spec.Webseeds { t.addWebSeed(url) } diff --git a/client_test.go b/client_test.go index e1725f05..fa522dc8 100644 --- a/client_test.go +++ b/client_test.go @@ -133,10 +133,9 @@ func TestAddDropManyTorrents(t *testing.T) { require.NoError(t, err) defer cl.Close() for i := range 1000 { - var spec TorrentSpec - binary.PutVarint(spec.InfoHash[:], int64(i+1)) - tt, new, err := cl.AddTorrentSpec(&spec) - assert.NoError(t, err) + var opts AddTorrentOpts + binary.PutVarint(opts.InfoHash[:], int64(i+1)) + tt, new := cl.AddTorrentOpt(opts) assert.True(t, new) defer tt.Drop() } @@ -185,11 +184,10 @@ func TestCompletedPieceWrongSize(t *testing.T) { } b, err := bencode.Marshal(info) require.NoError(t, err) - tt, new, err := cl.AddTorrentSpec(&TorrentSpec{ + tt, new := cl.AddTorrentOpt(AddTorrentOpts{ InfoBytes: b, InfoHash: metainfo.HashBytes(b), }) - require.NoError(t, err) defer tt.Drop() assert.True(t, new) r := tt.NewReader() @@ -367,10 +365,9 @@ func TestAddTorrentSpecMerging(t *testing.T) { defer cl.Close() dir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(dir) - tt, new, err := cl.AddTorrentSpec(&TorrentSpec{ + tt, new := cl.AddTorrentOpt(AddTorrentOpts{ InfoHash: mi.HashInfoBytes(), }) - require.NoError(t, err) require.True(t, new) require.Nil(t, tt.Info()) _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) @@ -384,7 +381,7 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) { os.RemoveAll(dir) cl, _ := NewClient(TestingConfig(t)) defer cl.Close() - tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{ + tt, _ := cl.AddTorrentOpt(AddTorrentOpts{ InfoHash: mi.HashInfoBytes(), }) tt.Drop() @@ -568,12 +565,11 @@ func TestPeerInvalidHave(t *testing.T) { } infoBytes, err := bencode.Marshal(info) require.NoError(t, err) - tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{ + tt, _new := cl.AddTorrentOpt(AddTorrentOpts{ InfoBytes: infoBytes, InfoHash: metainfo.HashBytes(infoBytes), Storage: badStorage{}, }) - require.NoError(t, err) assert.True(t, _new) defer tt.Drop() cn := &PeerConn{Peer: Peer{ @@ -596,7 +592,7 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) { seeder, err := NewClient(TestingConfig(t)) require.NoError(t, err) defer seeder.Close() - _, new, err := seeder.AddTorrentSpec(&TorrentSpec{ + _, new := seeder.AddTorrentOpt(AddTorrentOpts{ InfoBytes: greetingMetainfo.InfoBytes, InfoHash: greetingMetainfo.HashInfoBytes(), }) diff --git a/cmd/torrent/serve.go b/cmd/torrent/serve.go index b257f73a..7e58de1b 100644 --- a/cmd/torrent/serve.go +++ b/cmd/torrent/serve.go @@ -66,21 +66,17 @@ func serve() (cmd bargle.Command) { TorrentDirMaker: nil, PieceCompletion: pc, }), - }) - defer to.Drop() - err = to.MergeSpec(&torrent.TorrentSpec{ InfoBytes: mi.InfoBytes, - Trackers: [][]string{{ - `wss://tracker.btorrent.xyz`, - `wss://tracker.openwebtorrent.com`, - "http://p4p.arenabg.com:1337/announce", - "udp://tracker.opentrackr.org:1337/announce", - "udp://tracker.openbittorrent.com:6969/announce", - }}, }) - if err != nil { - return fmt.Errorf("setting trackers: %w", err) - } + defer to.Drop() + // TODO: Builtin trackers? + to.AddTrackers([][]string{{ + `wss://tracker.btorrent.xyz`, + `wss://tracker.openwebtorrent.com`, + "http://p4p.arenabg.com:1337/announce", + "udp://tracker.opentrackr.org:1337/announce", + "udp://tracker.openbittorrent.com:6969/announce", + }}) fmt.Printf("%v: %v\n", to, to.Metainfo().Magnet(&ih, &info)) } select {} diff --git a/deferrwl.go b/deferrwl.go index 0f1b3a54..df0c1f51 100644 --- a/deferrwl.go +++ b/deferrwl.go @@ -2,7 +2,9 @@ package torrent import ( "fmt" + "reflect" + g "github.com/anacrolix/generics" "github.com/anacrolix/sync" ) @@ -12,6 +14,7 @@ import ( type lockWithDeferreds struct { internal sync.RWMutex unlockActions []func() + m map[uintptr]struct{} } func (me *lockWithDeferreds) Lock() { @@ -28,6 +31,7 @@ func (me *lockWithDeferreds) Unlock() { panic(fmt.Sprintf("num deferred changed while running: %v -> %v", startLen, len(me.unlockActions))) } me.unlockActions = me.unlockActions[:0] + clear(me.unlockActions) } func (me *lockWithDeferreds) RLock() { @@ -41,3 +45,13 @@ func (me *lockWithDeferreds) RUnlock() { func (me *lockWithDeferreds) Defer(action func()) { me.unlockActions = append(me.unlockActions, action) } + +func (me *lockWithDeferreds) DeferOnce(action func()) { + g.MakeMapIfNil(&me.m) + key := reflect.ValueOf(action).Pointer() + if g.MapContains(me.m, key) { + return + } + me.m[key] = struct{}{} + me.Defer(action) +} diff --git a/file.go b/file.go index 6a168aa3..4da42dc8 100644 --- a/file.go +++ b/file.go @@ -1,8 +1,9 @@ package torrent import ( - "github.com/RoaringBitmap/roaring" + "iter" + "github.com/RoaringBitmap/roaring" g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/bitmap" @@ -218,3 +219,24 @@ func (f *File) EndPieceIndex() int { func (f *File) numPieces() int { return f.EndPieceIndex() - f.BeginPieceIndex() } + +func (f *File) PieceIndices() iter.Seq[int] { + return func(yield func(int) bool) { + for i := f.BeginPieceIndex(); i < f.EndPieceIndex(); i++ { + if !yield(i) { + break + } + } + } +} + +func (f *File) Pieces() iter.Seq[*Piece] { + return func(yield func(*Piece) bool) { + for i := range f.PieceIndices() { + p := f.t.piece(i) + if !yield(p) { + break + } + } + } +} diff --git a/issue211_test.go b/issue211_test.go index a76be07a..914db687 100644 --- a/issue211_test.go +++ b/issue211_test.go @@ -27,12 +27,11 @@ func TestDropTorrentWithMmapStorageWhileHashing(t *testing.T) { td, mi := testutil.GreetingTestTorrent() mms := storage.NewMMap(td) defer mms.Close() - tt, new, err := cl.AddTorrentSpec(&TorrentSpec{ + tt, new := cl.AddTorrentOpt(AddTorrentOpts{ Storage: mms, InfoHash: mi.HashInfoBytes(), InfoBytes: mi.InfoBytes, }) - require.NoError(t, err) assert.True(t, new) r := tt.NewReader() diff --git a/peer.go b/peer.go index 6cf7785f..530631ec 100644 --- a/peer.go +++ b/peer.go @@ -11,12 +11,11 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" + . "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/iter" - "github.com/anacrolix/multiless" - - . "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/internal/alloclim" "github.com/anacrolix/torrent/mse" diff --git a/piece.go b/piece.go index 2df0dd36..0b9e51cb 100644 --- a/piece.go +++ b/piece.go @@ -36,10 +36,6 @@ type Piece struct { numVerifies pieceVerifyCount numVerifiesCond chansync.BroadcastCond - hashing bool - // The piece state may have changed, and is being synchronized with storage. - marking bool - storageCompletionOk bool publicPieceState PieceState priority PiecePriority @@ -56,6 +52,13 @@ type Piece struct { // Connections that have written data to this piece since its last check. // This can include connections that have closed. dirtiers map[*Peer]struct{} + + // Currently being hashed. + hashing bool + // The piece state may have changed, and is being synchronized with storage. + marking bool + // The Completion.Ok field from the storage layer. + storageCompletionOk bool } func (p *Piece) String() string { @@ -283,8 +286,10 @@ func (p *Piece) UpdateCompletion() { } func (p *Piece) completion() (ret storage.Completion) { - ret.Complete = p.t.pieceComplete(p.index) ret.Ok = p.storageCompletionOk + if ret.Ok { + ret.Complete = p.t.pieceComplete(p.index) + } return } @@ -347,6 +352,8 @@ func (p *Piece) hasPieceLayer() bool { return p.numFiles() == 1 && p.mustGetOnlyFile().length > p.t.info.PieceLength } +// TODO: This looks inefficient. It will rehash everytime it is called. The hashes should be +// generated once. func (p *Piece) obtainHashV2() (hash [32]byte, err error) { if p.hashV2.Ok { hash = p.hashV2.Value @@ -357,7 +364,7 @@ func (p *Piece) obtainHashV2() (hash [32]byte, err error) { return } storage := p.Storage() - if !storage.Completion().Complete { + if c := storage.Completion(); c.Ok && !c.Complete { err = errors.New("piece incomplete") return } @@ -391,3 +398,13 @@ func (p *Piece) hasActivePeerConnRequests() (ret bool) { } return } + +// The value of numVerifies after the next hashing operation that hasn't yet begun. +func (p *Piece) nextNovelHashCount() (ret pieceVerifyCount) { + ret = p.numVerifies + 1 + if p.hashing { + // The next novel hash will be the one after the current one. + ret++ + } + return +} diff --git a/requesting.go b/requesting.go index 27872059..3eeaeb79 100644 --- a/requesting.go +++ b/requesting.go @@ -10,11 +10,10 @@ import ( "unsafe" "github.com/RoaringBitmap/roaring" - "github.com/anacrolix/log" - "github.com/anacrolix/multiless" - g "github.com/anacrolix/generics" "github.com/anacrolix/generics/heap" + "github.com/anacrolix/log" + "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/metainfo" requestStrategy "github.com/anacrolix/torrent/request-strategy" diff --git a/spec.go b/spec.go index 8323cdd0..e8381943 100644 --- a/spec.go +++ b/spec.go @@ -6,8 +6,6 @@ import ( g "github.com/anacrolix/generics" "github.com/anacrolix/torrent/metainfo" - pp "github.com/anacrolix/torrent/peer_protocol" - "github.com/anacrolix/torrent/storage" infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" ) @@ -15,12 +13,9 @@ import ( // constructor functions for magnet URIs and torrent metainfo files. TODO: This type should be // dismantled into a new Torrent option type, and separate Torrent mutate method(s). type TorrentSpec struct { + AddTorrentOpts // The tiered tracker URIs. Trackers [][]string - // TODO: Move into a "new" Torrent opt type. - InfoHash metainfo.Hash - InfoHashV2 g.Option[infohash_v2.T] - InfoBytes []byte // The name to use if the Name field from the Info isn't available. DisplayName string // WebSeed URLs. For additional options add the URLs separately with Torrent.AddWebSeeds @@ -33,15 +28,6 @@ type TorrentSpec struct { // BEP 52 "piece layers" from metainfo PieceLayers map[string]string - // The chunk size to use for outbound requests. Defaults to 16KiB if not set. Can only be set - // for new Torrents. TODO: Move into a "new" Torrent opt type. - ChunkSize pp.Integer - // TODO: Move into a "new" Torrent opt type. - Storage storage.ClientImpl - - // TODO: This should be on AddTorrentOpt too? - DisableInitialPieceCheck bool - // Whether to allow data download or upload DisallowDataUpload bool DisallowDataDownload bool @@ -55,13 +41,13 @@ func TorrentSpecFromMagnetUri(uri string) (spec *TorrentSpec, err error) { spec = &TorrentSpec{ Trackers: [][]string{m.Trackers}, DisplayName: m.DisplayName, - InfoHash: m.InfoHash.UnwrapOrZeroValue(), - InfoHashV2: m.V2InfoHash, Webseeds: m.Params["ws"], Sources: append(m.Params["xs"], m.Params["as"]...), PeerAddrs: m.Params["x.pe"], // BEP 9 // TODO: What's the parameter for DHT nodes? } + spec.InfoHash = m.InfoHash.UnwrapOrZeroValue() + spec.InfoHashV2 = m.V2InfoHash return } @@ -83,10 +69,7 @@ func TorrentSpecFromMetaInfoErr(mi *metainfo.MetaInfo) (*TorrentSpec, error) { return &TorrentSpec{ Trackers: mi.UpvertedAnnounceList(), - InfoHash: v1Ih, - InfoHashV2: v2Infohash, PieceLayers: mi.PieceLayers, - InfoBytes: mi.InfoBytes, DisplayName: info.BestName(), Webseeds: mi.UrlList, DhtNodes: func() (ret []string) { @@ -96,6 +79,11 @@ func TorrentSpecFromMetaInfoErr(mi *metainfo.MetaInfo) (*TorrentSpec, error) { } return }(), + AddTorrentOpts: AddTorrentOpts{ + InfoHash: v1Ih, + InfoHashV2: v2Infohash, + InfoBytes: mi.InfoBytes, + }, }, err } diff --git a/storage/interface.go b/storage/interface.go index b25e360d..9aadfc8a 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -69,7 +69,7 @@ type Completion struct { Err error // The state is known or cached. Ok bool - // If Ok, whether the data is correct. + // If Ok, whether the data is correct. TODO: Check all callsites test Ok first. Complete bool } diff --git a/test/issue377_test.go b/test/issue377_test.go index 3d1025e3..04bb7d28 100644 --- a/test/issue377_test.go +++ b/test/issue377_test.go @@ -66,12 +66,11 @@ func testReceiveChunkStorageFailure(t *testing.T, seederFast bool) { data: make([]byte, info.TotalLength()), } defer leecherStorage.Close() - leecherTorrent, new, err := leecherClient.AddTorrentSpec(&torrent.TorrentSpec{ + leecherTorrent, new := leecherClient.AddTorrentOpt(torrent.AddTorrentOpts{ InfoHash: metainfo.HashInfoBytes(), Storage: &leecherStorage, }) leecherStorage.t = leecherTorrent - require.NoError(t, err) assert.True(t, new) seederTorrent, err := seederClient.AddTorrent(metainfo) require.NoError(t, err) @@ -177,7 +176,8 @@ func (me pieceImpl) MarkComplete() error { } func (me pieceImpl) MarkNotComplete() error { - panic("implement me") + me.state().complete = false + return nil } func (me pieceImpl) Completion() storage.Completion { diff --git a/test/sqlite_test.go b/test/sqlite_test.go index 437499d4..fc3bd538 100644 --- a/test/sqlite_test.go +++ b/test/sqlite_test.go @@ -7,6 +7,7 @@ package test import ( + "errors" "net" "net/http" "testing" @@ -53,16 +54,16 @@ func TestSqliteStorageClosed(t *testing.T) { defer s.Close() go func() { err := s.Serve(l) - if err != http.ErrServerClosed { + if !errors.Is(err, http.ErrServerClosed) { panic(err) } }() // Close storage prematurely. storage.Close() - tor, _, err := cl.AddTorrentSpec(&torrent.TorrentSpec{ + tor, _ := cl.AddTorrentOpt(torrent.AddTorrentOpts{ InfoHash: mi.HashInfoBytes(), - Sources: []string{"http://" + l.Addr().String()}, }) + tor.UseSources([]string{"http://" + l.Addr().String()}) c.Assert(err, qt.IsNil) <-tor.GotInfo() } diff --git a/torrent.go b/torrent.go index e470fd88..5793aa46 100644 --- a/torrent.go +++ b/torrent.go @@ -163,9 +163,8 @@ type Torrent struct { // A cache of completed piece indices. _completedPieces roaring.Bitmap // Pieces that need to be hashed. - piecesQueuedForHash bitmap.Bitmap - activePieceHashes int - initialPieceCheckDisabled bool + piecesQueuedForHash bitmap.Bitmap + activePieceHashes int connsWithAllPieces map[*Peer]struct{} @@ -191,6 +190,10 @@ type Torrent struct { // Disable actions after updating piece priorities, for benchmarking. disableTriggers bool + // See AddTorrentOpts.DisableInitialPieceCheck + initialPieceCheckDisabled bool + // See AddTorrentOpts.IgnoreUnverifiedPieceCompletion + ignoreUnverifiedPieceCompletion bool } type torrentTrackerAnnouncerKey struct { @@ -305,11 +308,15 @@ func (t *Torrent) pieceComplete(piece pieceIndex) bool { return t._completedPieces.Contains(bitmap.BitIndex(piece)) } -func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion { +func (t *Torrent) pieceCompleteUncached(piece pieceIndex) (ret storage.Completion) { + p := t.piece(piece) + if t.ignoreUnverifiedPieceCompletion && p.numVerifies == 0 { + return + } if t.storage == nil { return storage.Completion{Complete: false, Ok: true} } - return t.pieces[piece].Storage().Completion() + return p.Storage().Completion() } func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn { @@ -525,7 +532,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { t.nameMu.Unlock() t._chunksPerRegularPiece = chunkIndexType( (pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize) - t.updateComplete() + t.deferUpdateComplete() t.displayName = "" // Save a few bytes lol. t.initFiles() t.cacheLength() @@ -547,8 +554,8 @@ func (t *Torrent) onSetInfo() { MakeSliceWithLength(&t.requestPieceStates, t.numPieces()) for i := range t.pieces { p := &t.pieces[i] - // Need to add relativeAvailability before updating piece completion, as that may result in conns - // being dropped. + // Need to add relativeAvailability before updating piece completion, as that may result in + // conns being dropped. if p.relativeAvailability != 0 { panic(p.relativeAvailability) } @@ -700,7 +707,7 @@ func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) { ret.Hashing = p.hashing ret.Checking = ret.QueuedForHash || ret.Hashing ret.Marking = p.marking - if !ret.Complete && t.piecePartiallyDownloaded(index) { + if ret.Ok && !ret.Complete && t.piecePartiallyDownloaded(index) { ret.Partial = true } if t.info.HasV2() && !p.hashV2.Ok && p.hasPieceLayer() { @@ -798,7 +805,7 @@ func (psr PieceStateRun) String() (ret string) { if psr.Partial { ret += "P" } - if psr.Complete { + if psr.Ok && psr.Complete { ret += "C" } if !psr.Ok { @@ -1671,7 +1678,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { uncached := t.pieceCompleteUncached(piece) cached := p.completion() changed := cached != uncached - complete := uncached.Complete + complete := uncached.Ok && uncached.Complete p.storageCompletionOk = uncached.Ok x := uint32(piece) if complete { @@ -2461,7 +2468,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } // Don't score the first time a piece is hashed, it could be an initial check. - if p.storageCompletionOk { + if p.numVerifies == 1 { if passed { pieceHashedCorrect.Add(1) } else { @@ -2624,6 +2631,7 @@ func (t *Torrent) tryCreatePieceHasher() bool { } p := t.piece(pi) t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi)) + t.deferUpdateComplete() p.hashing = true t.publishPieceStateChange(pi) t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher") @@ -2709,26 +2717,39 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) { } } +// Queue a check if one hasn't occurred before for the piece, and the completion state is unknown. func (t *Torrent) queueInitialPieceCheck(i pieceIndex) { - if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk { - t.queuePieceCheck(i) + if t.initialPieceCheckDisabled { + return + } + p := t.piece(i) + if p.numVerifies != 0 { + return + } + // If a hash is already occurring we've satisfied the initial piece check condition. + if p.hashing { + return + } + if p.storageCompletionOk { + return } + // Should only get closed or missing hash errors here which are ok. + _, _ = t.queuePieceCheck(i) } func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVerifyCount, err error) { piece := t.piece(pieceIndex) if !piece.haveHash() { + // Should we just queue the hash anyway? err = errors.New("piece hash unknown") + return } - targetVerifies = piece.numVerifies + 1 - if piece.hashing { - // The result of this queued piece check will be the one after the current one. - targetVerifies++ - } + targetVerifies = piece.nextNovelHashCount() if piece.queuedForHash() { return } t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex)) + t.deferUpdateComplete() t.publishPieceStateChange(pieceIndex) t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck") err = t.tryCreateMorePieceHashers() @@ -3048,6 +3069,11 @@ func (t *Torrent) pieceRequestIndexBegin(piece pieceIndex) RequestIndex { return RequestIndex(piece) * t.chunksPerRegularPiece() } +// Run complete validation when lock is released. +func (t *Torrent) deferUpdateComplete() { + t.cl._mu.DeferOnce(t.updateComplete) +} + func (t *Torrent) updateComplete() { // TODO: Announce complete to trackers? t.complete.SetBool(t.haveAllPieces()) diff --git a/torrent_test.go b/torrent_test.go index 638428c5..6a3d7ca1 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -152,7 +152,9 @@ func TestPieceHashFailed(t *testing.T) { cl := newTestingClient(t) tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{}) tt.setChunkSize(2) + tt.cl.lock() require.NoError(t, tt.setInfoBytesLocked(mi.InfoBytes)) + tt.cl.unlock() tt.cl.lock() tt.dirtyChunks.AddRange( uint64(tt.pieceRequestIndexBegin(1)), diff --git a/webseed-peer.go b/webseed-peer.go index d20b1945..ccb2fee3 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -13,8 +13,8 @@ import ( "time" "github.com/RoaringBitmap/roaring" - g "github.com/anacrolix/generics" + "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/webseed" diff --git a/webseed/client.go b/webseed/client.go index ca59b611..fb5c824e 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -11,7 +11,6 @@ import ( "strings" "github.com/RoaringBitmap/roaring" - "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/torrent/metainfo" -- 2.51.0