]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Use HTTP proxy config for torrent sources
authorMatt Joiner <anacrolix@gmail.com>
Fri, 25 Feb 2022 07:35:21 +0000 (18:35 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 25 Feb 2022 08:59:16 +0000 (19:59 +1100)
client.go
torrent.go

index 4815d53a8d40917057742d6c1bc3243cc45f8022..07be8f3cd228dafd02be7646b2b8d00cacb933d9 100644 (file)
--- a/client.go
+++ b/client.go
@@ -84,7 +84,7 @@ type Client struct {
        websocketTrackers websocketTrackers
 
        activeAnnounceLimiter limiter.Instance
-       webseedHttpClient     *http.Client
+       httpClient            *http.Client
 }
 
 type ipStr string
@@ -198,9 +198,11 @@ func (cl *Client) init(cfg *ClientConfig) {
        cl.activeAnnounceLimiter.SlotsPerKey = 2
        cl.event.L = cl.locker()
        cl.ipBlockList = cfg.IPBlocklist
-       cl.webseedHttpClient = &http.Client{
+       cl.httpClient = &http.Client{
                Transport: &http.Transport{
-                       Proxy:           cfg.HTTPProxy,
+                       Proxy: cfg.HTTPProxy,
+                       // I think this value was observed from some webseeds. It seems reasonable to extend it
+                       // to other uses of HTTP from the client.
                        MaxConnsPerHost: 10,
                },
        }
@@ -1188,6 +1190,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
        }
        t.networkingEnabled.Set()
        t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString())
+       t.sourcesLogger = t.logger.WithNames("sources")
        if opts.ChunkSize == 0 {
                opts.ChunkSize = defaultChunkSize
        }
@@ -1312,7 +1315,7 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
        cl.lock()
        defer cl.unlock()
        t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
-       useTorrentSources(spec.Sources, t)
+       t.useSources(spec.Sources)
        for _, url := range spec.Webseeds {
                t.addWebSeed(url)
        }
@@ -1333,23 +1336,37 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
        return nil
 }
 
-func useTorrentSources(sources []string, t *Torrent) {
-       // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
-       ctx := context.Background()
-       for i := 0; i < len(sources); i += 1 {
-               s := sources[i]
+func (t *Torrent) useSources(sources []string) {
+       select {
+       case <-t.Closed():
+               return
+       case <-t.GotInfo():
+               return
+       default:
+       }
+       for _, s := range sources {
+               _, loaded := t.activeSources.LoadOrStore(s, struct{}{})
+               if loaded {
+                       continue
+               }
+               s := s
                go func() {
-                       if err := useTorrentSource(ctx, s, t); err != nil {
-                               t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
-                       } else {
-                               t.logger.Printf("successfully used source %q", s)
+                       err := t.useActiveTorrentSource(s)
+                       _, loaded := t.activeSources.LoadAndDelete(s)
+                       if !loaded {
+                               panic(s)
                        }
+                       level := log.Debug
+                       if err != nil {
+                               level = log.Warning
+                       }
+                       t.logger.Levelf(level, "used torrent source %q [err=%q]", s, err)
                }()
        }
 }
 
-func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
-       ctx, cancel := context.WithCancel(ctx)
+func (t *Torrent) useActiveTorrentSource(source string) error {
+       ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
        go func() {
                select {
@@ -1359,24 +1376,25 @@ func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error
                }
                cancel()
        }()
+       mi, err := getTorrentSource(ctx, source, t.cl.httpClient)
+       if err != nil {
+               return err
+       }
+       return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
+}
+
+func getTorrentSource(ctx context.Context, source string, hc *http.Client) (mi metainfo.MetaInfo, err error) {
        var req *http.Request
        if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
-               panic(err)
+               return
        }
        var resp *http.Response
-       if resp, err = http.DefaultClient.Do(req); err != nil {
+       if resp, err = hc.Do(req); err != nil {
                return
        }
-       var mi metainfo.MetaInfo
+       defer resp.Body.Close()
        err = bencode.NewDecoder(resp.Body).Decode(&mi)
-       resp.Body.Close()
-       if err != nil {
-               if ctx.Err() != nil {
-                       return nil
-               }
-               return
-       }
-       return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
+       return
 }
 
 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
index e148d38bd98de87ccdccf2ad53e57b79bf780c63..f9635e31d3b1c40086655e5658617d33d8261f34 100644 (file)
@@ -147,6 +147,10 @@ type Torrent struct {
 
        // Is On when all pieces are complete.
        Complete chansync.Flag
+
+       // Torrent sources in use keyed by the source string.
+       activeSources sync.Map
+       sourcesLogger log.Logger
 }
 
 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
@@ -2308,7 +2312,7 @@ func (t *Torrent) addWebSeed(url string) {
                        callbacks:       t.callbacks(),
                },
                client: webseed.Client{
-                       HttpClient: t.cl.webseedHttpClient,
+                       HttpClient: t.cl.httpClient,
                        Url:        url,
                        ResponseBodyWrapper: func(r io.Reader) io.Reader {
                                return &rateLimitedReader{