]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add IgnoreUnverifiedPieceCompletion and move fields to AddTorrentOpts
authorMatt Joiner <anacrolix@gmail.com>
Wed, 21 May 2025 05:22:33 +0000 (15:22 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 21 May 2025 05:22:33 +0000 (15:22 +1000)
17 files changed:
client.go
client_test.go
cmd/torrent/serve.go
deferrwl.go
file.go
issue211_test.go
peer.go
piece.go
requesting.go
spec.go
storage/interface.go
test/issue377_test.go
test/sqlite_test.go
torrent.go
torrent_test.go
webseed-peer.go
webseed/client.go

index ef8306276b39b070d6e1fa3ad141ef985fc60659..1405b389059782997b469d6d2c75e8fd956d618e 100644 (file)
--- a/client.go
+++ b/client.go
@@ -23,20 +23,18 @@ import (
        "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/dht/v2"
        "github.com/anacrolix/dht/v2/krpc"
-       "github.com/cespare/xxhash"
-       "github.com/davecgh/go-spew/spew"
-       "github.com/dustin/go-humanize"
-       gbtree "github.com/google/btree"
-       "github.com/pion/webrtc/v4"
-
-       "github.com/anacrolix/log"
-
        . "github.com/anacrolix/generics"
        g "github.com/anacrolix/generics"
+       "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
+       "github.com/cespare/xxhash"
+       "github.com/davecgh/go-spew/spew"
+       "github.com/dustin/go-humanize"
+       gbtree "github.com/google/btree"
+       "github.com/pion/webrtc/v4"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/internal/check"
@@ -1386,6 +1384,9 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
                        L: cl.locker(),
                },
                gotMetainfoC: make(chan struct{}),
+
+               ignoreUnverifiedPieceCompletion: opts.IgnoreUnverifiedPieceCompletion,
+               initialPieceCheckDisabled:       opts.DisableInitialPieceCheck,
        }
        g.MakeMap(&t.webSeeds)
        t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background())
@@ -1493,17 +1494,18 @@ type AddTorrentOpts struct {
        Storage    storage.ClientImpl
        ChunkSize  pp.Integer
        InfoBytes  []byte
+       // Don't hash data if piece completion is missing. This is useful for very large torrents that
+       // are dropped in place from an external source and trigger a lot of initial piece checks.
+       DisableInitialPieceCheck bool
+       // Require pieces to be checked as soon as info is available. This is because we have no way to
+       // schedule an initial check only, and don't want to race against use of Torrent.Complete.
+       IgnoreUnverifiedPieceCompletion bool
 }
 
 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
 // Torrent.MergeSpec.
 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
-       t, new = cl.AddTorrentOpt(AddTorrentOpts{
-               InfoHash:   spec.InfoHash,
-               InfoHashV2: spec.InfoHashV2,
-               Storage:    spec.Storage,
-               ChunkSize:  spec.ChunkSize,
-       })
+       t, new = cl.AddTorrentOpt(spec.AddTorrentOpts)
        modSpec := *spec
        if new {
                // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
@@ -1533,9 +1535,9 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
        cl := t.cl
        cl.AddDhtNodes(spec.DhtNodes)
        t.UseSources(spec.Sources)
+       // TODO: The lock should be moved earlier.
        cl.lock()
        defer cl.unlock()
-       t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
        for _, url := range spec.Webseeds {
                t.addWebSeed(url)
        }
index e1725f0515cacc85803717f86501e2396d04c449..fa522dc8d6b8afb87d8ca1fd43bc166ab76a20e5 100644 (file)
@@ -133,10 +133,9 @@ func TestAddDropManyTorrents(t *testing.T) {
        require.NoError(t, err)
        defer cl.Close()
        for i := range 1000 {
-               var spec TorrentSpec
-               binary.PutVarint(spec.InfoHash[:], int64(i+1))
-               tt, new, err := cl.AddTorrentSpec(&spec)
-               assert.NoError(t, err)
+               var opts AddTorrentOpts
+               binary.PutVarint(opts.InfoHash[:], int64(i+1))
+               tt, new := cl.AddTorrentOpt(opts)
                assert.True(t, new)
                defer tt.Drop()
        }
@@ -185,11 +184,10 @@ func TestCompletedPieceWrongSize(t *testing.T) {
        }
        b, err := bencode.Marshal(info)
        require.NoError(t, err)
-       tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
+       tt, new := cl.AddTorrentOpt(AddTorrentOpts{
                InfoBytes: b,
                InfoHash:  metainfo.HashBytes(b),
        })
-       require.NoError(t, err)
        defer tt.Drop()
        assert.True(t, new)
        r := tt.NewReader()
@@ -367,10 +365,9 @@ func TestAddTorrentSpecMerging(t *testing.T) {
        defer cl.Close()
        dir, mi := testutil.GreetingTestTorrent()
        defer os.RemoveAll(dir)
-       tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
+       tt, new := cl.AddTorrentOpt(AddTorrentOpts{
                InfoHash: mi.HashInfoBytes(),
        })
-       require.NoError(t, err)
        require.True(t, new)
        require.Nil(t, tt.Info())
        _, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
@@ -384,7 +381,7 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
        os.RemoveAll(dir)
        cl, _ := NewClient(TestingConfig(t))
        defer cl.Close()
-       tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
+       tt, _ := cl.AddTorrentOpt(AddTorrentOpts{
                InfoHash: mi.HashInfoBytes(),
        })
        tt.Drop()
@@ -568,12 +565,11 @@ func TestPeerInvalidHave(t *testing.T) {
        }
        infoBytes, err := bencode.Marshal(info)
        require.NoError(t, err)
-       tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
+       tt, _new := cl.AddTorrentOpt(AddTorrentOpts{
                InfoBytes: infoBytes,
                InfoHash:  metainfo.HashBytes(infoBytes),
                Storage:   badStorage{},
        })
-       require.NoError(t, err)
        assert.True(t, _new)
        defer tt.Drop()
        cn := &PeerConn{Peer: Peer{
@@ -596,7 +592,7 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) {
        seeder, err := NewClient(TestingConfig(t))
        require.NoError(t, err)
        defer seeder.Close()
-       _, new, err := seeder.AddTorrentSpec(&TorrentSpec{
+       _, new := seeder.AddTorrentOpt(AddTorrentOpts{
                InfoBytes: greetingMetainfo.InfoBytes,
                InfoHash:  greetingMetainfo.HashInfoBytes(),
        })
index b257f73af7b20d66e677661b39aa362ba8705f51..7e58de1b8289dda0a2de206aa1d130928fc0f145 100644 (file)
@@ -66,21 +66,17 @@ func serve() (cmd bargle.Command) {
                                        TorrentDirMaker: nil,
                                        PieceCompletion: pc,
                                }),
-                       })
-                       defer to.Drop()
-                       err = to.MergeSpec(&torrent.TorrentSpec{
                                InfoBytes: mi.InfoBytes,
-                               Trackers: [][]string{{
-                                       `wss://tracker.btorrent.xyz`,
-                                       `wss://tracker.openwebtorrent.com`,
-                                       "http://p4p.arenabg.com:1337/announce",
-                                       "udp://tracker.opentrackr.org:1337/announce",
-                                       "udp://tracker.openbittorrent.com:6969/announce",
-                               }},
                        })
-                       if err != nil {
-                               return fmt.Errorf("setting trackers: %w", err)
-                       }
+                       defer to.Drop()
+                       // TODO: Builtin trackers?
+                       to.AddTrackers([][]string{{
+                               `wss://tracker.btorrent.xyz`,
+                               `wss://tracker.openwebtorrent.com`,
+                               "http://p4p.arenabg.com:1337/announce",
+                               "udp://tracker.opentrackr.org:1337/announce",
+                               "udp://tracker.openbittorrent.com:6969/announce",
+                       }})
                        fmt.Printf("%v: %v\n", to, to.Metainfo().Magnet(&ih, &info))
                }
                select {}
index 0f1b3a5443717f3d08327066709005ac83c880a2..df0c1f51a2c91714f2c8c9f69fbfc5ad3331e165 100644 (file)
@@ -2,7 +2,9 @@ package torrent
 
 import (
        "fmt"
+       "reflect"
 
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/sync"
 )
 
@@ -12,6 +14,7 @@ import (
 type lockWithDeferreds struct {
        internal      sync.RWMutex
        unlockActions []func()
+       m             map[uintptr]struct{}
 }
 
 func (me *lockWithDeferreds) Lock() {
@@ -28,6 +31,7 @@ func (me *lockWithDeferreds) Unlock() {
                panic(fmt.Sprintf("num deferred changed while running: %v -> %v", startLen, len(me.unlockActions)))
        }
        me.unlockActions = me.unlockActions[:0]
+       clear(me.unlockActions)
 }
 
 func (me *lockWithDeferreds) RLock() {
@@ -41,3 +45,13 @@ func (me *lockWithDeferreds) RUnlock() {
 func (me *lockWithDeferreds) Defer(action func()) {
        me.unlockActions = append(me.unlockActions, action)
 }
+
+func (me *lockWithDeferreds) DeferOnce(action func()) {
+       g.MakeMapIfNil(&me.m)
+       key := reflect.ValueOf(action).Pointer()
+       if g.MapContains(me.m, key) {
+               return
+       }
+       me.m[key] = struct{}{}
+       me.Defer(action)
+}
diff --git a/file.go b/file.go
index 6a168aa37d261cb3081f03f420f6624be20be6f6..4da42dc80dd83abf2f973c161ba595b11327e12e 100644 (file)
--- a/file.go
+++ b/file.go
@@ -1,8 +1,9 @@
 package torrent
 
 import (
-       "github.com/RoaringBitmap/roaring"
+       "iter"
 
+       "github.com/RoaringBitmap/roaring"
        g "github.com/anacrolix/generics"
        "github.com/anacrolix/missinggo/v2/bitmap"
 
@@ -218,3 +219,24 @@ func (f *File) EndPieceIndex() int {
 func (f *File) numPieces() int {
        return f.EndPieceIndex() - f.BeginPieceIndex()
 }
+
+func (f *File) PieceIndices() iter.Seq[int] {
+       return func(yield func(int) bool) {
+               for i := f.BeginPieceIndex(); i < f.EndPieceIndex(); i++ {
+                       if !yield(i) {
+                               break
+                       }
+               }
+       }
+}
+
+func (f *File) Pieces() iter.Seq[*Piece] {
+       return func(yield func(*Piece) bool) {
+               for i := range f.PieceIndices() {
+                       p := f.t.piece(i)
+                       if !yield(p) {
+                               break
+                       }
+               }
+       }
+}
index a76be07a8a2da0a6c6682e28038eb610a9d8f140..914db6871eeefb26c0125b981bf4836b6f1f87c6 100644 (file)
@@ -27,12 +27,11 @@ func TestDropTorrentWithMmapStorageWhileHashing(t *testing.T) {
        td, mi := testutil.GreetingTestTorrent()
        mms := storage.NewMMap(td)
        defer mms.Close()
-       tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
+       tt, new := cl.AddTorrentOpt(AddTorrentOpts{
                Storage:   mms,
                InfoHash:  mi.HashInfoBytes(),
                InfoBytes: mi.InfoBytes,
        })
-       require.NoError(t, err)
        assert.True(t, new)
 
        r := tt.NewReader()
diff --git a/peer.go b/peer.go
index 6cf7785f0ce7eb2f402debd47c51f8922463f389..530631eccbf38b69cf5d2e6e3e4149b09bb64a58 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -11,12 +11,11 @@ import (
 
        "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/chansync"
+       . "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/iter"
-       "github.com/anacrolix/multiless"
-
-       . "github.com/anacrolix/generics"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/multiless"
 
        "github.com/anacrolix/torrent/internal/alloclim"
        "github.com/anacrolix/torrent/mse"
index 2df0dd368ec485475d168ccf6764bb05e29cb321..0b9e51cb85a8ba43c1f631e0f8285e000587fd9e 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -36,10 +36,6 @@ type Piece struct {
 
        numVerifies     pieceVerifyCount
        numVerifiesCond chansync.BroadcastCond
-       hashing         bool
-       // The piece state may have changed, and is being synchronized with storage.
-       marking             bool
-       storageCompletionOk bool
 
        publicPieceState PieceState
        priority         PiecePriority
@@ -56,6 +52,13 @@ type Piece struct {
        // Connections that have written data to this piece since its last check.
        // This can include connections that have closed.
        dirtiers map[*Peer]struct{}
+
+       // Currently being hashed.
+       hashing bool
+       // The piece state may have changed, and is being synchronized with storage.
+       marking bool
+       // The Completion.Ok field from the storage layer.
+       storageCompletionOk bool
 }
 
 func (p *Piece) String() string {
@@ -283,8 +286,10 @@ func (p *Piece) UpdateCompletion() {
 }
 
 func (p *Piece) completion() (ret storage.Completion) {
-       ret.Complete = p.t.pieceComplete(p.index)
        ret.Ok = p.storageCompletionOk
+       if ret.Ok {
+               ret.Complete = p.t.pieceComplete(p.index)
+       }
        return
 }
 
@@ -347,6 +352,8 @@ func (p *Piece) hasPieceLayer() bool {
        return p.numFiles() == 1 && p.mustGetOnlyFile().length > p.t.info.PieceLength
 }
 
+// TODO: This looks inefficient. It will rehash everytime it is called. The hashes should be
+// generated once.
 func (p *Piece) obtainHashV2() (hash [32]byte, err error) {
        if p.hashV2.Ok {
                hash = p.hashV2.Value
@@ -357,7 +364,7 @@ func (p *Piece) obtainHashV2() (hash [32]byte, err error) {
                return
        }
        storage := p.Storage()
-       if !storage.Completion().Complete {
+       if c := storage.Completion(); c.Ok && !c.Complete {
                err = errors.New("piece incomplete")
                return
        }
@@ -391,3 +398,13 @@ func (p *Piece) hasActivePeerConnRequests() (ret bool) {
        }
        return
 }
+
+// The value of numVerifies after the next hashing operation that hasn't yet begun.
+func (p *Piece) nextNovelHashCount() (ret pieceVerifyCount) {
+       ret = p.numVerifies + 1
+       if p.hashing {
+               // The next novel hash will be the one after the current one.
+               ret++
+       }
+       return
+}
index 278720593eda861a420bde38573ca0052ef979b4..3eeaeb7946341a6a932ea82dba134dd3623f5ab6 100644 (file)
@@ -10,11 +10,10 @@ import (
        "unsafe"
 
        "github.com/RoaringBitmap/roaring"
-       "github.com/anacrolix/log"
-       "github.com/anacrolix/multiless"
-
        g "github.com/anacrolix/generics"
        "github.com/anacrolix/generics/heap"
+       "github.com/anacrolix/log"
+       "github.com/anacrolix/multiless"
 
        "github.com/anacrolix/torrent/metainfo"
        requestStrategy "github.com/anacrolix/torrent/request-strategy"
diff --git a/spec.go b/spec.go
index 8323cdd0b77fc2d867615506123c173902cf5423..e8381943179795c152f46a0b8594780205897fed 100644 (file)
--- a/spec.go
+++ b/spec.go
@@ -6,8 +6,6 @@ import (
        g "github.com/anacrolix/generics"
 
        "github.com/anacrolix/torrent/metainfo"
-       pp "github.com/anacrolix/torrent/peer_protocol"
-       "github.com/anacrolix/torrent/storage"
        infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
 )
 
@@ -15,12 +13,9 @@ import (
 // constructor functions for magnet URIs and torrent metainfo files. TODO: This type should be
 // dismantled into a new Torrent option type, and separate Torrent mutate method(s).
 type TorrentSpec struct {
+       AddTorrentOpts
        // The tiered tracker URIs.
        Trackers [][]string
-       // TODO: Move into a "new" Torrent opt type.
-       InfoHash   metainfo.Hash
-       InfoHashV2 g.Option[infohash_v2.T]
-       InfoBytes  []byte
        // The name to use if the Name field from the Info isn't available.
        DisplayName string
        // WebSeed URLs. For additional options add the URLs separately with Torrent.AddWebSeeds
@@ -33,15 +28,6 @@ type TorrentSpec struct {
        // BEP 52 "piece layers" from metainfo
        PieceLayers map[string]string
 
-       // The chunk size to use for outbound requests. Defaults to 16KiB if not set. Can only be set
-       // for new Torrents. TODO: Move into a "new" Torrent opt type.
-       ChunkSize pp.Integer
-       // TODO: Move into a "new" Torrent opt type.
-       Storage storage.ClientImpl
-
-       // TODO: This should be on AddTorrentOpt too?
-       DisableInitialPieceCheck bool
-
        // Whether to allow data download or upload
        DisallowDataUpload   bool
        DisallowDataDownload bool
@@ -55,13 +41,13 @@ func TorrentSpecFromMagnetUri(uri string) (spec *TorrentSpec, err error) {
        spec = &TorrentSpec{
                Trackers:    [][]string{m.Trackers},
                DisplayName: m.DisplayName,
-               InfoHash:    m.InfoHash.UnwrapOrZeroValue(),
-               InfoHashV2:  m.V2InfoHash,
                Webseeds:    m.Params["ws"],
                Sources:     append(m.Params["xs"], m.Params["as"]...),
                PeerAddrs:   m.Params["x.pe"], // BEP 9
                // TODO: What's the parameter for DHT nodes?
        }
+       spec.InfoHash = m.InfoHash.UnwrapOrZeroValue()
+       spec.InfoHashV2 = m.V2InfoHash
        return
 }
 
@@ -83,10 +69,7 @@ func TorrentSpecFromMetaInfoErr(mi *metainfo.MetaInfo) (*TorrentSpec, error) {
 
        return &TorrentSpec{
                Trackers:    mi.UpvertedAnnounceList(),
-               InfoHash:    v1Ih,
-               InfoHashV2:  v2Infohash,
                PieceLayers: mi.PieceLayers,
-               InfoBytes:   mi.InfoBytes,
                DisplayName: info.BestName(),
                Webseeds:    mi.UrlList,
                DhtNodes: func() (ret []string) {
@@ -96,6 +79,11 @@ func TorrentSpecFromMetaInfoErr(mi *metainfo.MetaInfo) (*TorrentSpec, error) {
                        }
                        return
                }(),
+               AddTorrentOpts: AddTorrentOpts{
+                       InfoHash:   v1Ih,
+                       InfoHashV2: v2Infohash,
+                       InfoBytes:  mi.InfoBytes,
+               },
        }, err
 }
 
index b25e360d3abbd2319369e5a06cac6b4be916bd39..9aadfc8a4988fc3e2c6d66111f71ca28a5207297 100644 (file)
@@ -69,7 +69,7 @@ type Completion struct {
        Err error
        // The state is known or cached.
        Ok bool
-       // If Ok, whether the data is correct.
+       // If Ok, whether the data is correct. TODO: Check all callsites test Ok first.
        Complete bool
 }
 
index 3d1025e3d480086e4fe8a142ccdd02b5decc43c6..04bb7d2862d120ee0e237de9b306f5599a2d5cda 100644 (file)
@@ -66,12 +66,11 @@ func testReceiveChunkStorageFailure(t *testing.T, seederFast bool) {
                data:   make([]byte, info.TotalLength()),
        }
        defer leecherStorage.Close()
-       leecherTorrent, new, err := leecherClient.AddTorrentSpec(&torrent.TorrentSpec{
+       leecherTorrent, new := leecherClient.AddTorrentOpt(torrent.AddTorrentOpts{
                InfoHash: metainfo.HashInfoBytes(),
                Storage:  &leecherStorage,
        })
        leecherStorage.t = leecherTorrent
-       require.NoError(t, err)
        assert.True(t, new)
        seederTorrent, err := seederClient.AddTorrent(metainfo)
        require.NoError(t, err)
@@ -177,7 +176,8 @@ func (me pieceImpl) MarkComplete() error {
 }
 
 func (me pieceImpl) MarkNotComplete() error {
-       panic("implement me")
+       me.state().complete = false
+       return nil
 }
 
 func (me pieceImpl) Completion() storage.Completion {
index 437499d44197818a30a3b3e934f7e7d7c0e20f5c..fc3bd538456b7fb0a31f701ab3be071469dda6f2 100644 (file)
@@ -7,6 +7,7 @@
 package test
 
 import (
+       "errors"
        "net"
        "net/http"
        "testing"
@@ -53,16 +54,16 @@ func TestSqliteStorageClosed(t *testing.T) {
        defer s.Close()
        go func() {
                err := s.Serve(l)
-               if err != http.ErrServerClosed {
+               if !errors.Is(err, http.ErrServerClosed) {
                        panic(err)
                }
        }()
        // Close storage prematurely.
        storage.Close()
-       tor, _, err := cl.AddTorrentSpec(&torrent.TorrentSpec{
+       tor, _ := cl.AddTorrentOpt(torrent.AddTorrentOpts{
                InfoHash: mi.HashInfoBytes(),
-               Sources:  []string{"http://" + l.Addr().String()},
        })
+       tor.UseSources([]string{"http://" + l.Addr().String()})
        c.Assert(err, qt.IsNil)
        <-tor.GotInfo()
 }
index e470fd881a8117003327360bf153ccd0fd54f73b..5793aa460bb7246b81060fc0324b176e54e0f353 100644 (file)
@@ -163,9 +163,8 @@ type Torrent struct {
        // A cache of completed piece indices.
        _completedPieces roaring.Bitmap
        // Pieces that need to be hashed.
-       piecesQueuedForHash       bitmap.Bitmap
-       activePieceHashes         int
-       initialPieceCheckDisabled bool
+       piecesQueuedForHash bitmap.Bitmap
+       activePieceHashes   int
 
        connsWithAllPieces map[*Peer]struct{}
 
@@ -191,6 +190,10 @@ type Torrent struct {
 
        // Disable actions after updating piece priorities, for benchmarking.
        disableTriggers bool
+       // See AddTorrentOpts.DisableInitialPieceCheck
+       initialPieceCheckDisabled bool
+       // See AddTorrentOpts.IgnoreUnverifiedPieceCompletion
+       ignoreUnverifiedPieceCompletion bool
 }
 
 type torrentTrackerAnnouncerKey struct {
@@ -305,11 +308,15 @@ func (t *Torrent) pieceComplete(piece pieceIndex) bool {
        return t._completedPieces.Contains(bitmap.BitIndex(piece))
 }
 
-func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
+func (t *Torrent) pieceCompleteUncached(piece pieceIndex) (ret storage.Completion) {
+       p := t.piece(piece)
+       if t.ignoreUnverifiedPieceCompletion && p.numVerifies == 0 {
+               return
+       }
        if t.storage == nil {
                return storage.Completion{Complete: false, Ok: true}
        }
-       return t.pieces[piece].Storage().Completion()
+       return p.Storage().Completion()
 }
 
 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
@@ -525,7 +532,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Unlock()
        t._chunksPerRegularPiece = chunkIndexType(
                (pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
-       t.updateComplete()
+       t.deferUpdateComplete()
        t.displayName = "" // Save a few bytes lol.
        t.initFiles()
        t.cacheLength()
@@ -547,8 +554,8 @@ func (t *Torrent) onSetInfo() {
        MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
        for i := range t.pieces {
                p := &t.pieces[i]
-               // Need to add relativeAvailability before updating piece completion, as that may result in conns
-               // being dropped.
+               // Need to add relativeAvailability before updating piece completion, as that may result in
+               // conns being dropped.
                if p.relativeAvailability != 0 {
                        panic(p.relativeAvailability)
                }
@@ -700,7 +707,7 @@ func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
        ret.Hashing = p.hashing
        ret.Checking = ret.QueuedForHash || ret.Hashing
        ret.Marking = p.marking
-       if !ret.Complete && t.piecePartiallyDownloaded(index) {
+       if ret.Ok && !ret.Complete && t.piecePartiallyDownloaded(index) {
                ret.Partial = true
        }
        if t.info.HasV2() && !p.hashV2.Ok && p.hasPieceLayer() {
@@ -798,7 +805,7 @@ func (psr PieceStateRun) String() (ret string) {
        if psr.Partial {
                ret += "P"
        }
-       if psr.Complete {
+       if psr.Ok && psr.Complete {
                ret += "C"
        }
        if !psr.Ok {
@@ -1671,7 +1678,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
        uncached := t.pieceCompleteUncached(piece)
        cached := p.completion()
        changed := cached != uncached
-       complete := uncached.Complete
+       complete := uncached.Ok && uncached.Complete
        p.storageCompletionOk = uncached.Ok
        x := uint32(piece)
        if complete {
@@ -2461,7 +2468,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
        }
 
        // Don't score the first time a piece is hashed, it could be an initial check.
-       if p.storageCompletionOk {
+       if p.numVerifies == 1 {
                if passed {
                        pieceHashedCorrect.Add(1)
                } else {
@@ -2624,6 +2631,7 @@ func (t *Torrent) tryCreatePieceHasher() bool {
        }
        p := t.piece(pi)
        t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
+       t.deferUpdateComplete()
        p.hashing = true
        t.publishPieceStateChange(pi)
        t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
@@ -2709,26 +2717,39 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
        }
 }
 
+// Queue a check if one hasn't occurred before for the piece, and the completion state is unknown.
 func (t *Torrent) queueInitialPieceCheck(i pieceIndex) {
-       if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk {
-               t.queuePieceCheck(i)
+       if t.initialPieceCheckDisabled {
+               return
+       }
+       p := t.piece(i)
+       if p.numVerifies != 0 {
+               return
+       }
+       // If a hash is already occurring we've satisfied the initial piece check condition.
+       if p.hashing {
+               return
+       }
+       if p.storageCompletionOk {
+               return
        }
+       // Should only get closed or missing hash errors here which are ok.
+       _, _ = t.queuePieceCheck(i)
 }
 
 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVerifyCount, err error) {
        piece := t.piece(pieceIndex)
        if !piece.haveHash() {
+               // Should we just queue the hash anyway?
                err = errors.New("piece hash unknown")
+               return
        }
-       targetVerifies = piece.numVerifies + 1
-       if piece.hashing {
-               // The result of this queued piece check will be the one after the current one.
-               targetVerifies++
-       }
+       targetVerifies = piece.nextNovelHashCount()
        if piece.queuedForHash() {
                return
        }
        t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
+       t.deferUpdateComplete()
        t.publishPieceStateChange(pieceIndex)
        t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
        err = t.tryCreateMorePieceHashers()
@@ -3048,6 +3069,11 @@ func (t *Torrent) pieceRequestIndexBegin(piece pieceIndex) RequestIndex {
        return RequestIndex(piece) * t.chunksPerRegularPiece()
 }
 
+// Run complete validation when lock is released.
+func (t *Torrent) deferUpdateComplete() {
+       t.cl._mu.DeferOnce(t.updateComplete)
+}
+
 func (t *Torrent) updateComplete() {
        // TODO: Announce complete to trackers?
        t.complete.SetBool(t.haveAllPieces())
index 638428c5ee6f61fae9327174124ad88523f01ab8..6a3d7ca1f395971d096ea8c93c9fd5c253f47fcf 100644 (file)
@@ -152,7 +152,9 @@ func TestPieceHashFailed(t *testing.T) {
        cl := newTestingClient(t)
        tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{})
        tt.setChunkSize(2)
+       tt.cl.lock()
        require.NoError(t, tt.setInfoBytesLocked(mi.InfoBytes))
+       tt.cl.unlock()
        tt.cl.lock()
        tt.dirtyChunks.AddRange(
                uint64(tt.pieceRequestIndexBegin(1)),
index d20b1945044cf734980e43cb117bd2a5621e3fa9..ccb2fee364b8eb3a0d5039663f8b3dcffef518f2 100644 (file)
@@ -13,8 +13,8 @@ import (
        "time"
 
        "github.com/RoaringBitmap/roaring"
-
        g "github.com/anacrolix/generics"
+
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/webseed"
index ca59b6115358d7ed649418de4b955c08f66af76a..fb5c824e0d4b720f1fb183c504f9d48c678f1af5 100644 (file)
@@ -11,7 +11,6 @@ import (
        "strings"
 
        "github.com/RoaringBitmap/roaring"
-
        "github.com/anacrolix/missinggo/v2/panicif"
 
        "github.com/anacrolix/torrent/metainfo"