]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix "sources" concurrency
authorMatt Joiner <anacrolix@gmail.com>
Tue, 27 May 2025 14:27:44 +0000 (00:27 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 27 May 2025 14:27:44 +0000 (00:27 +1000)
client.go
deprecated.go [new file with mode: 0644]
sources.go
test/sqlite_test.go
torrent.go

index 574a1401ab62c65fd4835cf7033c0740121c043f..3bd5fa489a34bd7f641c073e9e20e3c4161102a7 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1410,7 +1410,8 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
                initialPieceCheckDisabled:       opts.DisableInitialPieceCheck,
        }
        g.MakeMap(&t.webSeeds)
-       t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background())
+       t.closedCtx, t.closedCtxCancel = context.WithCancelCause(context.Background())
+       t.getInfoCtx, t.getInfoCtxCancel = context.WithCancelCause(t.closedCtx)
        var salt [8]byte
        rand.Read(salt[:])
        t.smartBanCache.Hash = func(b []byte) uint64 {
@@ -1554,7 +1555,7 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
        }
        cl := t.cl
        cl.AddDhtNodes(spec.DhtNodes)
-       t.UseSources(spec.Sources)
+       t.AddSources(spec.Sources)
        // TODO: The lock should be moved earlier.
        cl.lock()
        defer cl.unlock()
diff --git a/deprecated.go b/deprecated.go
new file mode 100644 (file)
index 0000000..6836c59
--- /dev/null
@@ -0,0 +1,7 @@
+package torrent
+
+// Deprecated: The names doesn't reflect the actual behaviour. You can only add sources. Use
+// AddSources instead.
+func (t *Torrent) UseSources(sources []string) {
+       t.AddSources(sources)
+}
index 9aeb80071675dff437cc84e9837d046b26dfbf0a..18aa6975478a0f97183f03ec1af14d3cfa7164fa 100644 (file)
@@ -2,11 +2,13 @@ package torrent
 
 import (
        "context"
-       "errors"
        "fmt"
+       "math/rand/v2"
        "net/http"
+       "time"
 
-       "github.com/anacrolix/log"
+       g "github.com/anacrolix/generics"
+       "github.com/anacrolix/missinggo/v2/panicif"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/metainfo"
@@ -14,50 +16,56 @@ import (
 
 // Add HTTP endpoints that serve the metainfo. They will be used if the torrent info isn't obtained
 // yet. The Client HTTP client is used.
-func (t *Torrent) UseSources(sources []string) {
-       select {
-       case <-t.Closed():
-               return
-       case <-t.GotInfo():
-               return
-       default:
-       }
+func (t *Torrent) AddSources(sources []string) {
        for _, s := range sources {
-               _, loaded := t.activeSources.LoadOrStore(s, struct{}{})
+               _, loaded := t.activeSources.LoadOrStore(s, nil)
                if loaded {
                        continue
                }
-               go func() {
-                       err := t.useActiveTorrentSource(s)
-                       _, loaded := t.activeSources.LoadAndDelete(s)
-                       if !loaded {
-                               panic(s)
-                       }
-                       level := log.Debug
-                       if err != nil && !errors.Is(err, context.Canceled) {
-                               level = log.Warning
-                       }
-                       t.logger.Levelf(level, "used torrent source %q [err=%v]", s, err)
-               }()
+               go t.sourcer(s)
        }
 }
 
-func (t *Torrent) useActiveTorrentSource(source string) error {
-       ctx, cancel := context.WithCancel(context.Background())
-       defer cancel()
-       go func() {
+// Tries fetching metainfo from a (HTTP) source until no longer necessary.
+func (t *Torrent) sourcer(source string) {
+       var err error
+       defer func() {
+               panicif.False(t.activeSources.CompareAndSwap(source, nil, err))
+       }()
+       ctx := t.getInfoCtx
+       for {
+               var retry g.Option[time.Duration]
+               retry, err = t.trySource(source)
+               if err == nil || ctx.Err() != nil || !retry.Ok {
+                       return
+               }
+               t.slogger().Warn("error using torrent source", "source", source, "err", err)
                select {
-               case <-t.GotInfo():
-               case <-t.Closed():
+               case <-time.After(retry.Value):
                case <-ctx.Done():
                }
-               cancel()
-       }()
-       mi, err := getTorrentSource(ctx, source, t.cl.httpClient)
+       }
+}
+
+// If retry is None, take the error you get as final.
+func (t *Torrent) trySource(source string) (retry g.Option[time.Duration], err error) {
+       t.sourceMutex.Lock()
+       defer t.sourceMutex.Unlock()
+       ctx := t.getInfoCtx
+       if ctx.Err() != nil {
+               return
+       }
+       var mi metainfo.MetaInfo
+       mi, err = getTorrentSource(ctx, source, t.cl.httpClient)
+       if ctx.Err() != nil {
+               return
+       }
        if err != nil {
-               return err
+               retry.Set(time.Duration(rand.Int64N(int64(time.Minute))))
+               return
        }
-       return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
+       err = t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
+       return
 }
 
 func getTorrentSource(ctx context.Context, source string, hc *http.Client) (mi metainfo.MetaInfo, err error) {
index fc3bd538456b7fb0a31f701ab3be071469dda6f2..36da82f8b8adf9b6782fac91376f4f70bf7f35f0 100644 (file)
@@ -63,7 +63,7 @@ func TestSqliteStorageClosed(t *testing.T) {
        tor, _ := cl.AddTorrentOpt(torrent.AddTorrentOpts{
                InfoHash: mi.HashInfoBytes(),
        })
-       tor.UseSources([]string{"http://" + l.Addr().String()})
+       tor.AddSources([]string{"http://" + l.Addr().String()})
        c.Assert(err, qt.IsNil)
        <-tor.GotInfo()
 }
index 365898f1c11a2cff95f7186c7ea553d1e5717b8c..a3a484eb394fbe4a220f94cf73284317e5c9d5a2 100644 (file)
@@ -77,7 +77,7 @@ type Torrent struct {
        // A background Context cancelled when the Torrent is closed. Added to minimize extra goroutines
        // in tracker handlers.
        closedCtx       context.Context
-       closedCtxCancel func()
+       closedCtxCancel context.CancelCauseFunc
        onClose         []func()
 
        infoHash   g.Option[metainfo.Hash]
@@ -108,8 +108,13 @@ type Torrent struct {
        metainfo metainfo.MetaInfo
 
        // The info dict. nil if we don't have it (yet).
-       info  *metainfo.Info
-       files *[]*File
+       info *metainfo.Info
+       // For scoping routines that depend on needing the info. Saves spinning up lots of helper
+       // routines. Cancelled when the Torrent is Closed too.
+       getInfoCtx context.Context
+       // Put a nice reason in :)
+       getInfoCtxCancel context.CancelCauseFunc
+       files            *[]*File
 
        _chunksPerRegularPiece chunkIndexType
 
@@ -178,8 +183,11 @@ type Torrent struct {
        // Is On when all pieces are complete.
        complete chansync.Flag
 
-       // Torrent sources in use keyed by the source string.
+       // Torrent sources in use keyed by the source string. string -> error. If the slot is occupied
+       // there's a worker for it.
        activeSources sync.Map
+       // One source fetch at a time. We use mutex in the original definition.
+       sourceMutex sync.Mutex
 
        smartBanCache smartBanCache
 
@@ -528,6 +536,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        }
        t.nameMu.Lock()
        t.info = info
+       t.getInfoCtxCancel(errors.New("got info"))
        t.nameMu.Unlock()
        t._chunksPerRegularPiece = chunkIndexType(
                (pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
@@ -1046,7 +1055,8 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
                err = errors.New("already closed")
                return
        }
-       t.closedCtxCancel()
+       t.closedCtxCancel(errTorrentClosed)
+       t.getInfoCtxCancel(errTorrentClosed)
        for _, f := range t.onClose {
                f()
        }
@@ -1725,7 +1735,7 @@ func (t *Torrent) maybeCompleteMetadata() error {
        err := t.setInfoBytesLocked(t.metadataBytes)
        if err != nil {
                t.invalidateMetadata()
-               return fmt.Errorf("error setting info bytes: %s", err)
+               return fmt.Errorf("error setting info bytes: %w", err)
        }
        if t.cl.config.Debug {
                t.logger.Printf("%s: got metadata from peers", t)