]> Sergey Matveev's repositories - btrtrc.git/blobdiff - tracker_scraper.go
Drop support for go 1.20
[btrtrc.git] / tracker_scraper.go
index 65673539524f9d9bea67ae204a9f78f9ff2da853..863838ace478b622721a89364380d92957e497db 100644 (file)
@@ -2,6 +2,7 @@ package torrent
 
 import (
        "bytes"
+       "context"
        "errors"
        "fmt"
        "net"
@@ -17,13 +18,10 @@ import (
 // Announces a torrent to a tracker at regular intervals, when peers are
 // required.
 type trackerScraper struct {
-       u            url.URL
-       t            *Torrent
-       lastAnnounce trackerAnnounceResult
-       allow        func()
-       // The slowdown argument lets us indicate if we think there should be some backpressure on
-       // access to the tracker. It doesn't necessarily have to be used.
-       done func(slowdown bool)
+       u               url.URL
+       t               *Torrent
+       lastAnnounce    trackerAnnounceResult
+       lookupTrackerIp func(*url.URL) ([]net.IP, error)
 }
 
 type torrentTrackerAnnouncer interface {
@@ -69,7 +67,13 @@ type trackerAnnounceResult struct {
 }
 
 func (me *trackerScraper) getIp() (ip net.IP, err error) {
-       ips, err := net.LookupIP(me.u.Hostname())
+       var ips []net.IP
+       if me.lookupTrackerIp != nil {
+               ips, err = me.lookupTrackerIp(&me.u)
+       } else {
+               // Do a regular dns lookup
+               ips, err = net.LookupIP(me.u.Hostname())
+       }
        if err != nil {
                return
        }
@@ -77,6 +81,12 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) {
                err = errors.New("no ips")
                return
        }
+       me.t.cl.rLock()
+       defer me.t.cl.rUnlock()
+       if me.t.cl.closed.IsSet() {
+               err = errors.New("client is closed")
+               return
+       }
        for _, ip = range ips {
                if me.t.cl.ipIsBlocked(ip) {
                        continue
@@ -107,16 +117,29 @@ func (me *trackerScraper) trackerUrl(ip net.IP) string {
 
 // Return how long to wait before trying again. For most errors, we return 5
 // minutes, a relatively quick turn around for DNS changes.
-func (me *trackerScraper) announce(event tracker.AnnounceEvent) (ret trackerAnnounceResult) {
+func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) {
        defer func() {
                ret.Completed = time.Now()
        }()
        ret.Interval = time.Minute
-       me.allow()
-       // We might pass true if we got an error. Currently we don't because timing out with a
-       // reasonably long timeout is its own form of backpressure (it remains to be seen if it's
-       // enough).
-       defer me.done(false)
+
+       // Limit concurrent use of the same tracker URL by the Client.
+       ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
+       defer ref.Drop()
+       select {
+       case <-ctx.Done():
+               ret.Err = ctx.Err()
+               return
+       case ref.C() <- struct{}{}:
+       }
+       defer func() {
+               select {
+               case <-ref.C():
+               default:
+                       panic("should return immediately")
+               }
+       }()
+
        ip, err := me.getIp()
        if err != nil {
                ret.Err = fmt.Errorf("error getting ip: %s", err)
@@ -125,22 +148,28 @@ func (me *trackerScraper) announce(event tracker.AnnounceEvent) (ret trackerAnno
        me.t.cl.rLock()
        req := me.t.announceRequest(event)
        me.t.cl.rUnlock()
-       // The default timeout is currently 15s, and that works well as backpressure on concurrent
-       // access to the tracker.
-       //ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
-       //defer cancel()
+       // The default timeout works well as backpressure on concurrent access to the tracker. Since
+       // we're passing our own Context now, we will include that timeout ourselves to maintain similar
+       // behavior to previously, albeit with this context now being cancelled when the Torrent is
+       // closed.
+       ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout)
+       defer cancel()
        me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req)
        res, err := tracker.Announce{
-               //Context:    ctx,
-               HTTPProxy:  me.t.cl.config.HTTPProxy,
-               UserAgent:  me.t.cl.config.HTTPUserAgent,
-               TrackerUrl: me.trackerUrl(ip),
-               Request:    req,
-               HostHeader: me.u.Host,
-               ServerName: me.u.Hostname(),
-               UdpNetwork: me.u.Scheme,
-               ClientIp4:  krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
-               ClientIp6:  krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
+               Context:             ctx,
+               HttpProxy:           me.t.cl.config.HTTPProxy,
+               HttpRequestDirector: me.t.cl.config.HttpRequestDirector,
+               DialContext:         me.t.cl.config.TrackerDialContext,
+               ListenPacket:        me.t.cl.config.TrackerListenPacket,
+               UserAgent:           me.t.cl.config.HTTPUserAgent,
+               TrackerUrl:          me.trackerUrl(ip),
+               Request:             req,
+               HostHeader:          me.u.Host,
+               ServerName:          me.u.Hostname(),
+               UdpNetwork:          me.u.Scheme,
+               ClientIp4:           krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
+               ClientIp6:           krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
+               Logger:              me.t.logger,
        }.Do()
        me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err)
        if err != nil {
@@ -171,10 +200,22 @@ func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
 
 func (me *trackerScraper) Run() {
        defer me.announceStopped()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       go func() {
+               defer cancel()
+               select {
+               case <-ctx.Done():
+               case <-me.t.Closed():
+               }
+       }()
+
        // make sure first announce is a "started"
        e := tracker.Started
+
        for {
-               ar := me.announce(e)
+               ar := me.announce(ctx, e)
                // after first announce, get back to regular "none"
                e = tracker.None
                me.t.cl.lock()
@@ -190,7 +231,6 @@ func (me *trackerScraper) Run() {
 
                me.t.cl.lock()
                wantPeers := me.t.wantPeersEvent.C()
-               closed := me.t.closed.C()
                me.t.cl.unlock()
 
                // If we want peers, reduce the interval to the minimum if it's appropriate.
@@ -208,7 +248,7 @@ func (me *trackerScraper) Run() {
                }
 
                select {
-               case <-closed:
+               case <-me.t.closed.Done():
                        return
                case <-reconsider:
                        // Recalculate the interval.
@@ -219,5 +259,7 @@ func (me *trackerScraper) Run() {
 }
 
 func (me *trackerScraper) announceStopped() {
-       me.announce(tracker.Stopped)
+       ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout)
+       defer cancel()
+       me.announce(ctx, tracker.Stopped)
 }