From: Matt Joiner Date: Fri, 25 Feb 2022 07:35:21 +0000 (+1100) Subject: Use HTTP proxy config for torrent sources X-Git-Tag: v1.42.0~14^2~3 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=d06f990b81c4b49ec3b47942499a1ff5b2cd5f9d;p=btrtrc.git Use HTTP proxy config for torrent sources --- diff --git a/client.go b/client.go index 4815d53a..07be8f3c 100644 --- a/client.go +++ b/client.go @@ -84,7 +84,7 @@ type Client struct { websocketTrackers websocketTrackers activeAnnounceLimiter limiter.Instance - webseedHttpClient *http.Client + httpClient *http.Client } type ipStr string @@ -198,9 +198,11 @@ func (cl *Client) init(cfg *ClientConfig) { cl.activeAnnounceLimiter.SlotsPerKey = 2 cl.event.L = cl.locker() cl.ipBlockList = cfg.IPBlocklist - cl.webseedHttpClient = &http.Client{ + cl.httpClient = &http.Client{ Transport: &http.Transport{ - Proxy: cfg.HTTPProxy, + Proxy: cfg.HTTPProxy, + // I think this value was observed from some webseeds. It seems reasonable to extend it + // to other uses of HTTP from the client. MaxConnsPerHost: 10, }, } @@ -1188,6 +1190,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { } t.networkingEnabled.Set() t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()) + t.sourcesLogger = t.logger.WithNames("sources") if opts.ChunkSize == 0 { opts.ChunkSize = defaultChunkSize } @@ -1312,7 +1315,7 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { cl.lock() defer cl.unlock() t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck - useTorrentSources(spec.Sources, t) + t.useSources(spec.Sources) for _, url := range spec.Webseeds { t.addWebSeed(url) } @@ -1333,23 +1336,37 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { return nil } -func useTorrentSources(sources []string, t *Torrent) { - // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes - ctx := context.Background() - for i := 0; i < len(sources); i += 1 { - s := sources[i] +func (t *Torrent) useSources(sources []string) { + select { + case <-t.Closed(): + return + case <-t.GotInfo(): + return + default: + } + for _, s := range sources { + _, loaded := t.activeSources.LoadOrStore(s, struct{}{}) + if loaded { + continue + } + s := s go func() { - if err := useTorrentSource(ctx, s, t); err != nil { - t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err) - } else { - t.logger.Printf("successfully used source %q", s) + err := t.useActiveTorrentSource(s) + _, loaded := t.activeSources.LoadAndDelete(s) + if !loaded { + panic(s) } + level := log.Debug + if err != nil { + level = log.Warning + } + t.logger.Levelf(level, "used torrent source %q [err=%q]", s, err) }() } } -func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) { - ctx, cancel := context.WithCancel(ctx) +func (t *Torrent) useActiveTorrentSource(source string) error { + ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { select { @@ -1359,24 +1376,25 @@ func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error } cancel() }() + mi, err := getTorrentSource(ctx, source, t.cl.httpClient) + if err != nil { + return err + } + return t.MergeSpec(TorrentSpecFromMetaInfo(&mi)) +} + +func getTorrentSource(ctx context.Context, source string, hc *http.Client) (mi metainfo.MetaInfo, err error) { var req *http.Request if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil { - panic(err) + return } var resp *http.Response - if resp, err = http.DefaultClient.Do(req); err != nil { + if resp, err = hc.Do(req); err != nil { return } - var mi metainfo.MetaInfo + defer resp.Body.Close() err = bencode.NewDecoder(resp.Body).Decode(&mi) - resp.Body.Close() - if err != nil { - if ctx.Err() != nil { - return nil - } - return - } - return t.MergeSpec(TorrentSpecFromMetaInfo(&mi)) + return } func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) { diff --git a/torrent.go b/torrent.go index e148d38b..f9635e31 100644 --- a/torrent.go +++ b/torrent.go @@ -147,6 +147,10 @@ type Torrent struct { // Is On when all pieces are complete. Complete chansync.Flag + + // Torrent sources in use keyed by the source string. + activeSources sync.Map + sourcesLogger log.Logger } func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) { @@ -2308,7 +2312,7 @@ func (t *Torrent) addWebSeed(url string) { callbacks: t.callbacks(), }, client: webseed.Client{ - HttpClient: t.cl.webseedHttpClient, + HttpClient: t.cl.httpClient, Url: url, ResponseBodyWrapper: func(r io.Reader) io.Reader { return &rateLimitedReader{