"github.com/anacrolix/chansync/events"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc"
- "github.com/cespare/xxhash"
- "github.com/davecgh/go-spew/spew"
- "github.com/dustin/go-humanize"
- gbtree "github.com/google/btree"
- "github.com/pion/webrtc/v4"
-
- "github.com/anacrolix/log"
-
. "github.com/anacrolix/generics"
g "github.com/anacrolix/generics"
+ "github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
+ "github.com/cespare/xxhash"
+ "github.com/davecgh/go-spew/spew"
+ "github.com/dustin/go-humanize"
+ gbtree "github.com/google/btree"
+ "github.com/pion/webrtc/v4"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/internal/check"
L: cl.locker(),
},
gotMetainfoC: make(chan struct{}),
+
+ ignoreUnverifiedPieceCompletion: opts.IgnoreUnverifiedPieceCompletion,
+ initialPieceCheckDisabled: opts.DisableInitialPieceCheck,
}
g.MakeMap(&t.webSeeds)
t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background())
Storage storage.ClientImpl
ChunkSize pp.Integer
InfoBytes []byte
+ // Don't hash data if piece completion is missing. This is useful for very large torrents that
+ // are dropped in place from an external source and trigger a lot of initial piece checks.
+ DisableInitialPieceCheck bool
+ // Require pieces to be checked as soon as info is available. This is because we have no way to
+ // schedule an initial check only, and don't want to race against use of Torrent.Complete.
+ IgnoreUnverifiedPieceCompletion bool
}
// Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
// Torrent.MergeSpec.
func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
- t, new = cl.AddTorrentOpt(AddTorrentOpts{
- InfoHash: spec.InfoHash,
- InfoHashV2: spec.InfoHashV2,
- Storage: spec.Storage,
- ChunkSize: spec.ChunkSize,
- })
+ t, new = cl.AddTorrentOpt(spec.AddTorrentOpts)
modSpec := *spec
if new {
// ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
cl := t.cl
cl.AddDhtNodes(spec.DhtNodes)
t.UseSources(spec.Sources)
+ // TODO: The lock should be moved earlier.
cl.lock()
defer cl.unlock()
- t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
for _, url := range spec.Webseeds {
t.addWebSeed(url)
}
require.NoError(t, err)
defer cl.Close()
for i := range 1000 {
- var spec TorrentSpec
- binary.PutVarint(spec.InfoHash[:], int64(i+1))
- tt, new, err := cl.AddTorrentSpec(&spec)
- assert.NoError(t, err)
+ var opts AddTorrentOpts
+ binary.PutVarint(opts.InfoHash[:], int64(i+1))
+ tt, new := cl.AddTorrentOpt(opts)
assert.True(t, new)
defer tt.Drop()
}
}
b, err := bencode.Marshal(info)
require.NoError(t, err)
- tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
+ tt, new := cl.AddTorrentOpt(AddTorrentOpts{
InfoBytes: b,
InfoHash: metainfo.HashBytes(b),
})
- require.NoError(t, err)
defer tt.Drop()
assert.True(t, new)
r := tt.NewReader()
defer cl.Close()
dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)
- tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
+ tt, new := cl.AddTorrentOpt(AddTorrentOpts{
InfoHash: mi.HashInfoBytes(),
})
- require.NoError(t, err)
require.True(t, new)
require.Nil(t, tt.Info())
_, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
os.RemoveAll(dir)
cl, _ := NewClient(TestingConfig(t))
defer cl.Close()
- tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{
+ tt, _ := cl.AddTorrentOpt(AddTorrentOpts{
InfoHash: mi.HashInfoBytes(),
})
tt.Drop()
}
infoBytes, err := bencode.Marshal(info)
require.NoError(t, err)
- tt, _new, err := cl.AddTorrentSpec(&TorrentSpec{
+ tt, _new := cl.AddTorrentOpt(AddTorrentOpts{
InfoBytes: infoBytes,
InfoHash: metainfo.HashBytes(infoBytes),
Storage: badStorage{},
})
- require.NoError(t, err)
assert.True(t, _new)
defer tt.Drop()
cn := &PeerConn{Peer: Peer{
seeder, err := NewClient(TestingConfig(t))
require.NoError(t, err)
defer seeder.Close()
- _, new, err := seeder.AddTorrentSpec(&TorrentSpec{
+ _, new := seeder.AddTorrentOpt(AddTorrentOpts{
InfoBytes: greetingMetainfo.InfoBytes,
InfoHash: greetingMetainfo.HashInfoBytes(),
})
TorrentDirMaker: nil,
PieceCompletion: pc,
}),
- })
- defer to.Drop()
- err = to.MergeSpec(&torrent.TorrentSpec{
InfoBytes: mi.InfoBytes,
- Trackers: [][]string{{
- `wss://tracker.btorrent.xyz`,
- `wss://tracker.openwebtorrent.com`,
- "http://p4p.arenabg.com:1337/announce",
- "udp://tracker.opentrackr.org:1337/announce",
- "udp://tracker.openbittorrent.com:6969/announce",
- }},
})
- if err != nil {
- return fmt.Errorf("setting trackers: %w", err)
- }
+ defer to.Drop()
+ // TODO: Builtin trackers?
+ to.AddTrackers([][]string{{
+ `wss://tracker.btorrent.xyz`,
+ `wss://tracker.openwebtorrent.com`,
+ "http://p4p.arenabg.com:1337/announce",
+ "udp://tracker.opentrackr.org:1337/announce",
+ "udp://tracker.openbittorrent.com:6969/announce",
+ }})
fmt.Printf("%v: %v\n", to, to.Metainfo().Magnet(&ih, &info))
}
select {}
import (
"fmt"
+ "reflect"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/sync"
)
type lockWithDeferreds struct {
internal sync.RWMutex
unlockActions []func()
+ m map[uintptr]struct{}
}
func (me *lockWithDeferreds) Lock() {
panic(fmt.Sprintf("num deferred changed while running: %v -> %v", startLen, len(me.unlockActions)))
}
me.unlockActions = me.unlockActions[:0]
+ clear(me.unlockActions)
}
func (me *lockWithDeferreds) RLock() {
func (me *lockWithDeferreds) Defer(action func()) {
me.unlockActions = append(me.unlockActions, action)
}
+
+func (me *lockWithDeferreds) DeferOnce(action func()) {
+ g.MakeMapIfNil(&me.m)
+ key := reflect.ValueOf(action).Pointer()
+ if g.MapContains(me.m, key) {
+ return
+ }
+ me.m[key] = struct{}{}
+ me.Defer(action)
+}
package torrent
import (
- "github.com/RoaringBitmap/roaring"
+ "iter"
+ "github.com/RoaringBitmap/roaring"
g "github.com/anacrolix/generics"
"github.com/anacrolix/missinggo/v2/bitmap"
func (f *File) numPieces() int {
return f.EndPieceIndex() - f.BeginPieceIndex()
}
+
+func (f *File) PieceIndices() iter.Seq[int] {
+ return func(yield func(int) bool) {
+ for i := f.BeginPieceIndex(); i < f.EndPieceIndex(); i++ {
+ if !yield(i) {
+ break
+ }
+ }
+ }
+}
+
+func (f *File) Pieces() iter.Seq[*Piece] {
+ return func(yield func(*Piece) bool) {
+ for i := range f.PieceIndices() {
+ p := f.t.piece(i)
+ if !yield(p) {
+ break
+ }
+ }
+ }
+}
td, mi := testutil.GreetingTestTorrent()
mms := storage.NewMMap(td)
defer mms.Close()
- tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
+ tt, new := cl.AddTorrentOpt(AddTorrentOpts{
Storage: mms,
InfoHash: mi.HashInfoBytes(),
InfoBytes: mi.InfoBytes,
})
- require.NoError(t, err)
assert.True(t, new)
r := tt.NewReader()
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/chansync"
+ . "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/iter"
- "github.com/anacrolix/multiless"
-
- . "github.com/anacrolix/generics"
"github.com/anacrolix/missinggo/v2/bitmap"
+ "github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/internal/alloclim"
"github.com/anacrolix/torrent/mse"
numVerifies pieceVerifyCount
numVerifiesCond chansync.BroadcastCond
- hashing bool
- // The piece state may have changed, and is being synchronized with storage.
- marking bool
- storageCompletionOk bool
publicPieceState PieceState
priority PiecePriority
// Connections that have written data to this piece since its last check.
// This can include connections that have closed.
dirtiers map[*Peer]struct{}
+
+ // Currently being hashed.
+ hashing bool
+ // The piece state may have changed, and is being synchronized with storage.
+ marking bool
+ // The Completion.Ok field from the storage layer.
+ storageCompletionOk bool
}
func (p *Piece) String() string {
}
func (p *Piece) completion() (ret storage.Completion) {
- ret.Complete = p.t.pieceComplete(p.index)
ret.Ok = p.storageCompletionOk
+ if ret.Ok {
+ ret.Complete = p.t.pieceComplete(p.index)
+ }
return
}
return p.numFiles() == 1 && p.mustGetOnlyFile().length > p.t.info.PieceLength
}
+// TODO: This looks inefficient. It will rehash everytime it is called. The hashes should be
+// generated once.
func (p *Piece) obtainHashV2() (hash [32]byte, err error) {
if p.hashV2.Ok {
hash = p.hashV2.Value
return
}
storage := p.Storage()
- if !storage.Completion().Complete {
+ if c := storage.Completion(); c.Ok && !c.Complete {
err = errors.New("piece incomplete")
return
}
}
return
}
+
+// The value of numVerifies after the next hashing operation that hasn't yet begun.
+func (p *Piece) nextNovelHashCount() (ret pieceVerifyCount) {
+ ret = p.numVerifies + 1
+ if p.hashing {
+ // The next novel hash will be the one after the current one.
+ ret++
+ }
+ return
+}
"unsafe"
"github.com/RoaringBitmap/roaring"
- "github.com/anacrolix/log"
- "github.com/anacrolix/multiless"
-
g "github.com/anacrolix/generics"
"github.com/anacrolix/generics/heap"
+ "github.com/anacrolix/log"
+ "github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/metainfo"
requestStrategy "github.com/anacrolix/torrent/request-strategy"
g "github.com/anacrolix/generics"
"github.com/anacrolix/torrent/metainfo"
- pp "github.com/anacrolix/torrent/peer_protocol"
- "github.com/anacrolix/torrent/storage"
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
)
// constructor functions for magnet URIs and torrent metainfo files. TODO: This type should be
// dismantled into a new Torrent option type, and separate Torrent mutate method(s).
type TorrentSpec struct {
+ AddTorrentOpts
// The tiered tracker URIs.
Trackers [][]string
- // TODO: Move into a "new" Torrent opt type.
- InfoHash metainfo.Hash
- InfoHashV2 g.Option[infohash_v2.T]
- InfoBytes []byte
// The name to use if the Name field from the Info isn't available.
DisplayName string
// WebSeed URLs. For additional options add the URLs separately with Torrent.AddWebSeeds
// BEP 52 "piece layers" from metainfo
PieceLayers map[string]string
- // The chunk size to use for outbound requests. Defaults to 16KiB if not set. Can only be set
- // for new Torrents. TODO: Move into a "new" Torrent opt type.
- ChunkSize pp.Integer
- // TODO: Move into a "new" Torrent opt type.
- Storage storage.ClientImpl
-
- // TODO: This should be on AddTorrentOpt too?
- DisableInitialPieceCheck bool
-
// Whether to allow data download or upload
DisallowDataUpload bool
DisallowDataDownload bool
spec = &TorrentSpec{
Trackers: [][]string{m.Trackers},
DisplayName: m.DisplayName,
- InfoHash: m.InfoHash.UnwrapOrZeroValue(),
- InfoHashV2: m.V2InfoHash,
Webseeds: m.Params["ws"],
Sources: append(m.Params["xs"], m.Params["as"]...),
PeerAddrs: m.Params["x.pe"], // BEP 9
// TODO: What's the parameter for DHT nodes?
}
+ spec.InfoHash = m.InfoHash.UnwrapOrZeroValue()
+ spec.InfoHashV2 = m.V2InfoHash
return
}
return &TorrentSpec{
Trackers: mi.UpvertedAnnounceList(),
- InfoHash: v1Ih,
- InfoHashV2: v2Infohash,
PieceLayers: mi.PieceLayers,
- InfoBytes: mi.InfoBytes,
DisplayName: info.BestName(),
Webseeds: mi.UrlList,
DhtNodes: func() (ret []string) {
}
return
}(),
+ AddTorrentOpts: AddTorrentOpts{
+ InfoHash: v1Ih,
+ InfoHashV2: v2Infohash,
+ InfoBytes: mi.InfoBytes,
+ },
}, err
}
Err error
// The state is known or cached.
Ok bool
- // If Ok, whether the data is correct.
+ // If Ok, whether the data is correct. TODO: Check all callsites test Ok first.
Complete bool
}
data: make([]byte, info.TotalLength()),
}
defer leecherStorage.Close()
- leecherTorrent, new, err := leecherClient.AddTorrentSpec(&torrent.TorrentSpec{
+ leecherTorrent, new := leecherClient.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: metainfo.HashInfoBytes(),
Storage: &leecherStorage,
})
leecherStorage.t = leecherTorrent
- require.NoError(t, err)
assert.True(t, new)
seederTorrent, err := seederClient.AddTorrent(metainfo)
require.NoError(t, err)
}
func (me pieceImpl) MarkNotComplete() error {
- panic("implement me")
+ me.state().complete = false
+ return nil
}
func (me pieceImpl) Completion() storage.Completion {
package test
import (
+ "errors"
"net"
"net/http"
"testing"
defer s.Close()
go func() {
err := s.Serve(l)
- if err != http.ErrServerClosed {
+ if !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
// Close storage prematurely.
storage.Close()
- tor, _, err := cl.AddTorrentSpec(&torrent.TorrentSpec{
+ tor, _ := cl.AddTorrentOpt(torrent.AddTorrentOpts{
InfoHash: mi.HashInfoBytes(),
- Sources: []string{"http://" + l.Addr().String()},
})
+ tor.UseSources([]string{"http://" + l.Addr().String()})
c.Assert(err, qt.IsNil)
<-tor.GotInfo()
}
// A cache of completed piece indices.
_completedPieces roaring.Bitmap
// Pieces that need to be hashed.
- piecesQueuedForHash bitmap.Bitmap
- activePieceHashes int
- initialPieceCheckDisabled bool
+ piecesQueuedForHash bitmap.Bitmap
+ activePieceHashes int
connsWithAllPieces map[*Peer]struct{}
// Disable actions after updating piece priorities, for benchmarking.
disableTriggers bool
+ // See AddTorrentOpts.DisableInitialPieceCheck
+ initialPieceCheckDisabled bool
+ // See AddTorrentOpts.IgnoreUnverifiedPieceCompletion
+ ignoreUnverifiedPieceCompletion bool
}
type torrentTrackerAnnouncerKey struct {
return t._completedPieces.Contains(bitmap.BitIndex(piece))
}
-func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
+func (t *Torrent) pieceCompleteUncached(piece pieceIndex) (ret storage.Completion) {
+ p := t.piece(piece)
+ if t.ignoreUnverifiedPieceCompletion && p.numVerifies == 0 {
+ return
+ }
if t.storage == nil {
return storage.Completion{Complete: false, Ok: true}
}
- return t.pieces[piece].Storage().Completion()
+ return p.Storage().Completion()
}
func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
t.nameMu.Unlock()
t._chunksPerRegularPiece = chunkIndexType(
(pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
- t.updateComplete()
+ t.deferUpdateComplete()
t.displayName = "" // Save a few bytes lol.
t.initFiles()
t.cacheLength()
MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
for i := range t.pieces {
p := &t.pieces[i]
- // Need to add relativeAvailability before updating piece completion, as that may result in conns
- // being dropped.
+ // Need to add relativeAvailability before updating piece completion, as that may result in
+ // conns being dropped.
if p.relativeAvailability != 0 {
panic(p.relativeAvailability)
}
ret.Hashing = p.hashing
ret.Checking = ret.QueuedForHash || ret.Hashing
ret.Marking = p.marking
- if !ret.Complete && t.piecePartiallyDownloaded(index) {
+ if ret.Ok && !ret.Complete && t.piecePartiallyDownloaded(index) {
ret.Partial = true
}
if t.info.HasV2() && !p.hashV2.Ok && p.hasPieceLayer() {
if psr.Partial {
ret += "P"
}
- if psr.Complete {
+ if psr.Ok && psr.Complete {
ret += "C"
}
if !psr.Ok {
uncached := t.pieceCompleteUncached(piece)
cached := p.completion()
changed := cached != uncached
- complete := uncached.Complete
+ complete := uncached.Ok && uncached.Complete
p.storageCompletionOk = uncached.Ok
x := uint32(piece)
if complete {
}
// Don't score the first time a piece is hashed, it could be an initial check.
- if p.storageCompletionOk {
+ if p.numVerifies == 1 {
if passed {
pieceHashedCorrect.Add(1)
} else {
}
p := t.piece(pi)
t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
+ t.deferUpdateComplete()
p.hashing = true
t.publishPieceStateChange(pi)
t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
}
}
+// Queue a check if one hasn't occurred before for the piece, and the completion state is unknown.
func (t *Torrent) queueInitialPieceCheck(i pieceIndex) {
- if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk {
- t.queuePieceCheck(i)
+ if t.initialPieceCheckDisabled {
+ return
+ }
+ p := t.piece(i)
+ if p.numVerifies != 0 {
+ return
+ }
+ // If a hash is already occurring we've satisfied the initial piece check condition.
+ if p.hashing {
+ return
+ }
+ if p.storageCompletionOk {
+ return
}
+ // Should only get closed or missing hash errors here which are ok.
+ _, _ = t.queuePieceCheck(i)
}
func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVerifyCount, err error) {
piece := t.piece(pieceIndex)
if !piece.haveHash() {
+ // Should we just queue the hash anyway?
err = errors.New("piece hash unknown")
+ return
}
- targetVerifies = piece.numVerifies + 1
- if piece.hashing {
- // The result of this queued piece check will be the one after the current one.
- targetVerifies++
- }
+ targetVerifies = piece.nextNovelHashCount()
if piece.queuedForHash() {
return
}
t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
+ t.deferUpdateComplete()
t.publishPieceStateChange(pieceIndex)
t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
err = t.tryCreateMorePieceHashers()
return RequestIndex(piece) * t.chunksPerRegularPiece()
}
+// Run complete validation when lock is released.
+func (t *Torrent) deferUpdateComplete() {
+ t.cl._mu.DeferOnce(t.updateComplete)
+}
+
func (t *Torrent) updateComplete() {
// TODO: Announce complete to trackers?
t.complete.SetBool(t.haveAllPieces())
cl := newTestingClient(t)
tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{})
tt.setChunkSize(2)
+ tt.cl.lock()
require.NoError(t, tt.setInfoBytesLocked(mi.InfoBytes))
+ tt.cl.unlock()
tt.cl.lock()
tt.dirtyChunks.AddRange(
uint64(tt.pieceRequestIndexBegin(1)),
"time"
"github.com/RoaringBitmap/roaring"
-
g "github.com/anacrolix/generics"
+
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/webseed"
"strings"
"github.com/RoaringBitmap/roaring"
-
"github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/torrent/metainfo"