websocketTrackers websocketTrackers
activeAnnounceLimiter limiter.Instance
- webseedHttpClient *http.Client
+ httpClient *http.Client
}
type ipStr string
cl.activeAnnounceLimiter.SlotsPerKey = 2
cl.event.L = cl.locker()
cl.ipBlockList = cfg.IPBlocklist
- cl.webseedHttpClient = &http.Client{
+ cl.httpClient = &http.Client{
Transport: &http.Transport{
- Proxy: cfg.HTTPProxy,
+ Proxy: cfg.HTTPProxy,
+ // I think this value was observed from some webseeds. It seems reasonable to extend it
+ // to other uses of HTTP from the client.
MaxConnsPerHost: 10,
},
}
}
t.networkingEnabled.Set()
t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString())
+ t.sourcesLogger = t.logger.WithNames("sources")
if opts.ChunkSize == 0 {
opts.ChunkSize = defaultChunkSize
}
cl.lock()
defer cl.unlock()
t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
- useTorrentSources(spec.Sources, t)
+ t.useSources(spec.Sources)
for _, url := range spec.Webseeds {
t.addWebSeed(url)
}
return nil
}
-func useTorrentSources(sources []string, t *Torrent) {
- // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
- ctx := context.Background()
- for i := 0; i < len(sources); i += 1 {
- s := sources[i]
+func (t *Torrent) useSources(sources []string) {
+ select {
+ case <-t.Closed():
+ return
+ case <-t.GotInfo():
+ return
+ default:
+ }
+ for _, s := range sources {
+ _, loaded := t.activeSources.LoadOrStore(s, struct{}{})
+ if loaded {
+ continue
+ }
+ s := s
go func() {
- if err := useTorrentSource(ctx, s, t); err != nil {
- t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
- } else {
- t.logger.Printf("successfully used source %q", s)
+ err := t.useActiveTorrentSource(s)
+ _, loaded := t.activeSources.LoadAndDelete(s)
+ if !loaded {
+ panic(s)
}
+ level := log.Debug
+ if err != nil {
+ level = log.Warning
+ }
+ t.logger.Levelf(level, "used torrent source %q [err=%q]", s, err)
}()
}
}
-func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
- ctx, cancel := context.WithCancel(ctx)
+func (t *Torrent) useActiveTorrentSource(source string) error {
+ ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
}
cancel()
}()
+ mi, err := getTorrentSource(ctx, source, t.cl.httpClient)
+ if err != nil {
+ return err
+ }
+ return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
+}
+
+func getTorrentSource(ctx context.Context, source string, hc *http.Client) (mi metainfo.MetaInfo, err error) {
var req *http.Request
if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
- panic(err)
+ return
}
var resp *http.Response
- if resp, err = http.DefaultClient.Do(req); err != nil {
+ if resp, err = hc.Do(req); err != nil {
return
}
- var mi metainfo.MetaInfo
+ defer resp.Body.Close()
err = bencode.NewDecoder(resp.Body).Decode(&mi)
- resp.Body.Close()
- if err != nil {
- if ctx.Err() != nil {
- return nil
- }
- return
- }
- return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
+ return
}
func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {