initialPieceCheckDisabled: opts.DisableInitialPieceCheck,
}
g.MakeMap(&t.webSeeds)
- t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background())
+ t.closedCtx, t.closedCtxCancel = context.WithCancelCause(context.Background())
+ t.getInfoCtx, t.getInfoCtxCancel = context.WithCancelCause(t.closedCtx)
var salt [8]byte
rand.Read(salt[:])
t.smartBanCache.Hash = func(b []byte) uint64 {
}
cl := t.cl
cl.AddDhtNodes(spec.DhtNodes)
- t.UseSources(spec.Sources)
+ t.AddSources(spec.Sources)
// TODO: The lock should be moved earlier.
cl.lock()
defer cl.unlock()
import (
"context"
- "errors"
"fmt"
+ "math/rand/v2"
"net/http"
+ "time"
- "github.com/anacrolix/log"
+ g "github.com/anacrolix/generics"
+ "github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
// Add HTTP endpoints that serve the metainfo. They will be used if the torrent info isn't obtained
// yet. The Client HTTP client is used.
-func (t *Torrent) UseSources(sources []string) {
- select {
- case <-t.Closed():
- return
- case <-t.GotInfo():
- return
- default:
- }
+func (t *Torrent) AddSources(sources []string) {
for _, s := range sources {
- _, loaded := t.activeSources.LoadOrStore(s, struct{}{})
+ _, loaded := t.activeSources.LoadOrStore(s, nil)
if loaded {
continue
}
- go func() {
- err := t.useActiveTorrentSource(s)
- _, loaded := t.activeSources.LoadAndDelete(s)
- if !loaded {
- panic(s)
- }
- level := log.Debug
- if err != nil && !errors.Is(err, context.Canceled) {
- level = log.Warning
- }
- t.logger.Levelf(level, "used torrent source %q [err=%v]", s, err)
- }()
+ go t.sourcer(s)
}
}
-func (t *Torrent) useActiveTorrentSource(source string) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go func() {
+// Tries fetching metainfo from a (HTTP) source until no longer necessary.
+func (t *Torrent) sourcer(source string) {
+ var err error
+ defer func() {
+ panicif.False(t.activeSources.CompareAndSwap(source, nil, err))
+ }()
+ ctx := t.getInfoCtx
+ for {
+ var retry g.Option[time.Duration]
+ retry, err = t.trySource(source)
+ if err == nil || ctx.Err() != nil || !retry.Ok {
+ return
+ }
+ t.slogger().Warn("error using torrent source", "source", source, "err", err)
select {
- case <-t.GotInfo():
- case <-t.Closed():
+ case <-time.After(retry.Value):
case <-ctx.Done():
}
- cancel()
- }()
- mi, err := getTorrentSource(ctx, source, t.cl.httpClient)
+ }
+}
+
+// If retry is None, take the error you get as final.
+func (t *Torrent) trySource(source string) (retry g.Option[time.Duration], err error) {
+ t.sourceMutex.Lock()
+ defer t.sourceMutex.Unlock()
+ ctx := t.getInfoCtx
+ if ctx.Err() != nil {
+ return
+ }
+ var mi metainfo.MetaInfo
+ mi, err = getTorrentSource(ctx, source, t.cl.httpClient)
+ if ctx.Err() != nil {
+ return
+ }
if err != nil {
- return err
+ retry.Set(time.Duration(rand.Int64N(int64(time.Minute))))
+ return
}
- return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
+ err = t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
+ return
}
func getTorrentSource(ctx context.Context, source string, hc *http.Client) (mi metainfo.MetaInfo, err error) {
// A background Context cancelled when the Torrent is closed. Added to minimize extra goroutines
// in tracker handlers.
closedCtx context.Context
- closedCtxCancel func()
+ closedCtxCancel context.CancelCauseFunc
onClose []func()
infoHash g.Option[metainfo.Hash]
metainfo metainfo.MetaInfo
// The info dict. nil if we don't have it (yet).
- info *metainfo.Info
- files *[]*File
+ info *metainfo.Info
+ // For scoping routines that depend on needing the info. Saves spinning up lots of helper
+ // routines. Cancelled when the Torrent is Closed too.
+ getInfoCtx context.Context
+ // Put a nice reason in :)
+ getInfoCtxCancel context.CancelCauseFunc
+ files *[]*File
_chunksPerRegularPiece chunkIndexType
// Is On when all pieces are complete.
complete chansync.Flag
- // Torrent sources in use keyed by the source string.
+ // Torrent sources in use keyed by the source string. string -> error. If the slot is occupied
+ // there's a worker for it.
activeSources sync.Map
+ // One source fetch at a time. We use mutex in the original definition.
+ sourceMutex sync.Mutex
smartBanCache smartBanCache
}
t.nameMu.Lock()
t.info = info
+ t.getInfoCtxCancel(errors.New("got info"))
t.nameMu.Unlock()
t._chunksPerRegularPiece = chunkIndexType(
(pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
err = errors.New("already closed")
return
}
- t.closedCtxCancel()
+ t.closedCtxCancel(errTorrentClosed)
+ t.getInfoCtxCancel(errTorrentClosed)
for _, f := range t.onClose {
f()
}
err := t.setInfoBytesLocked(t.metadataBytes)
if err != nil {
t.invalidateMetadata()
- return fmt.Errorf("error setting info bytes: %s", err)
+ return fmt.Errorf("error setting info bytes: %w", err)
}
if t.cl.config.Debug {
t.logger.Printf("%s: got metadata from peers", t)