From 64ad40ae781c41ce7be8b9d4f225ab590dd8b15a Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 28 May 2025 00:27:44 +1000 Subject: [PATCH] Fix "sources" concurrency --- client.go | 5 +-- deprecated.go | 7 +++++ sources.go | 76 +++++++++++++++++++++++++-------------------- test/sqlite_test.go | 2 +- torrent.go | 22 +++++++++---- 5 files changed, 69 insertions(+), 43 deletions(-) create mode 100644 deprecated.go diff --git a/client.go b/client.go index 574a1401..3bd5fa48 100644 --- a/client.go +++ b/client.go @@ -1410,7 +1410,8 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { initialPieceCheckDisabled: opts.DisableInitialPieceCheck, } g.MakeMap(&t.webSeeds) - t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background()) + t.closedCtx, t.closedCtxCancel = context.WithCancelCause(context.Background()) + t.getInfoCtx, t.getInfoCtxCancel = context.WithCancelCause(t.closedCtx) var salt [8]byte rand.Read(salt[:]) t.smartBanCache.Hash = func(b []byte) uint64 { @@ -1554,7 +1555,7 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { } cl := t.cl cl.AddDhtNodes(spec.DhtNodes) - t.UseSources(spec.Sources) + t.AddSources(spec.Sources) // TODO: The lock should be moved earlier. cl.lock() defer cl.unlock() diff --git a/deprecated.go b/deprecated.go new file mode 100644 index 00000000..6836c598 --- /dev/null +++ b/deprecated.go @@ -0,0 +1,7 @@ +package torrent + +// Deprecated: The names doesn't reflect the actual behaviour. You can only add sources. Use +// AddSources instead. +func (t *Torrent) UseSources(sources []string) { + t.AddSources(sources) +} diff --git a/sources.go b/sources.go index 9aeb8007..18aa6975 100644 --- a/sources.go +++ b/sources.go @@ -2,11 +2,13 @@ package torrent import ( "context" - "errors" "fmt" + "math/rand/v2" "net/http" + "time" - "github.com/anacrolix/log" + g "github.com/anacrolix/generics" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" @@ -14,50 +16,56 @@ import ( // Add HTTP endpoints that serve the metainfo. They will be used if the torrent info isn't obtained // yet. The Client HTTP client is used. -func (t *Torrent) UseSources(sources []string) { - select { - case <-t.Closed(): - return - case <-t.GotInfo(): - return - default: - } +func (t *Torrent) AddSources(sources []string) { for _, s := range sources { - _, loaded := t.activeSources.LoadOrStore(s, struct{}{}) + _, loaded := t.activeSources.LoadOrStore(s, nil) if loaded { continue } - go func() { - err := t.useActiveTorrentSource(s) - _, loaded := t.activeSources.LoadAndDelete(s) - if !loaded { - panic(s) - } - level := log.Debug - if err != nil && !errors.Is(err, context.Canceled) { - level = log.Warning - } - t.logger.Levelf(level, "used torrent source %q [err=%v]", s, err) - }() + go t.sourcer(s) } } -func (t *Torrent) useActiveTorrentSource(source string) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { +// Tries fetching metainfo from a (HTTP) source until no longer necessary. +func (t *Torrent) sourcer(source string) { + var err error + defer func() { + panicif.False(t.activeSources.CompareAndSwap(source, nil, err)) + }() + ctx := t.getInfoCtx + for { + var retry g.Option[time.Duration] + retry, err = t.trySource(source) + if err == nil || ctx.Err() != nil || !retry.Ok { + return + } + t.slogger().Warn("error using torrent source", "source", source, "err", err) select { - case <-t.GotInfo(): - case <-t.Closed(): + case <-time.After(retry.Value): case <-ctx.Done(): } - cancel() - }() - mi, err := getTorrentSource(ctx, source, t.cl.httpClient) + } +} + +// If retry is None, take the error you get as final. +func (t *Torrent) trySource(source string) (retry g.Option[time.Duration], err error) { + t.sourceMutex.Lock() + defer t.sourceMutex.Unlock() + ctx := t.getInfoCtx + if ctx.Err() != nil { + return + } + var mi metainfo.MetaInfo + mi, err = getTorrentSource(ctx, source, t.cl.httpClient) + if ctx.Err() != nil { + return + } if err != nil { - return err + retry.Set(time.Duration(rand.Int64N(int64(time.Minute)))) + return } - return t.MergeSpec(TorrentSpecFromMetaInfo(&mi)) + err = t.MergeSpec(TorrentSpecFromMetaInfo(&mi)) + return } func getTorrentSource(ctx context.Context, source string, hc *http.Client) (mi metainfo.MetaInfo, err error) { diff --git a/test/sqlite_test.go b/test/sqlite_test.go index fc3bd538..36da82f8 100644 --- a/test/sqlite_test.go +++ b/test/sqlite_test.go @@ -63,7 +63,7 @@ func TestSqliteStorageClosed(t *testing.T) { tor, _ := cl.AddTorrentOpt(torrent.AddTorrentOpts{ InfoHash: mi.HashInfoBytes(), }) - tor.UseSources([]string{"http://" + l.Addr().String()}) + tor.AddSources([]string{"http://" + l.Addr().String()}) c.Assert(err, qt.IsNil) <-tor.GotInfo() } diff --git a/torrent.go b/torrent.go index 365898f1..a3a484eb 100644 --- a/torrent.go +++ b/torrent.go @@ -77,7 +77,7 @@ type Torrent struct { // A background Context cancelled when the Torrent is closed. Added to minimize extra goroutines // in tracker handlers. closedCtx context.Context - closedCtxCancel func() + closedCtxCancel context.CancelCauseFunc onClose []func() infoHash g.Option[metainfo.Hash] @@ -108,8 +108,13 @@ type Torrent struct { metainfo metainfo.MetaInfo // The info dict. nil if we don't have it (yet). - info *metainfo.Info - files *[]*File + info *metainfo.Info + // For scoping routines that depend on needing the info. Saves spinning up lots of helper + // routines. Cancelled when the Torrent is Closed too. + getInfoCtx context.Context + // Put a nice reason in :) + getInfoCtxCancel context.CancelCauseFunc + files *[]*File _chunksPerRegularPiece chunkIndexType @@ -178,8 +183,11 @@ type Torrent struct { // Is On when all pieces are complete. complete chansync.Flag - // Torrent sources in use keyed by the source string. + // Torrent sources in use keyed by the source string. string -> error. If the slot is occupied + // there's a worker for it. activeSources sync.Map + // One source fetch at a time. We use mutex in the original definition. + sourceMutex sync.Mutex smartBanCache smartBanCache @@ -528,6 +536,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { } t.nameMu.Lock() t.info = info + t.getInfoCtxCancel(errors.New("got info")) t.nameMu.Unlock() t._chunksPerRegularPiece = chunkIndexType( (pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize) @@ -1046,7 +1055,8 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { err = errors.New("already closed") return } - t.closedCtxCancel() + t.closedCtxCancel(errTorrentClosed) + t.getInfoCtxCancel(errTorrentClosed) for _, f := range t.onClose { f() } @@ -1725,7 +1735,7 @@ func (t *Torrent) maybeCompleteMetadata() error { err := t.setInfoBytesLocked(t.metadataBytes) if err != nil { t.invalidateMetadata() - return fmt.Errorf("error setting info bytes: %s", err) + return fmt.Errorf("error setting info bytes: %w", err) } if t.cl.config.Debug { t.logger.Printf("%s: got metadata from peers", t) -- 2.51.0