]> Sergey Matveev's repositories - btrtrc.git/blobdiff - tracker_scraper.go
Drop support for go 1.20
[btrtrc.git] / tracker_scraper.go
index 5c3956b0c8fca8acdea30dbcab403500dcf5e66a..863838ace478b622721a89364380d92957e497db 100644 (file)
@@ -1,10 +1,16 @@
 package torrent
 
 import (
-       "log"
+       "bytes"
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "net/url"
        "time"
 
-       "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/dht/v2/krpc"
+       "github.com/anacrolix/log"
 
        "github.com/anacrolix/torrent/tracker"
 )
@@ -12,66 +18,248 @@ import (
 // Announces a torrent to a tracker at regular intervals, when peers are
 // required.
 type trackerScraper struct {
-       url string
-       // Causes the trackerScraper to stop running.
-       stop missinggo.Event
-       t    *Torrent
+       u               url.URL
+       t               *Torrent
+       lastAnnounce    trackerAnnounceResult
+       lookupTrackerIp func(*url.URL) ([]net.IP, error)
 }
 
-func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) {
-       ret = make([]Peer, 0, len(ps))
-       for _, p := range ps {
-               ret = append(ret, Peer{
-                       IP:     p.IP,
-                       Port:   p.Port,
-                       Source: peerSourceTracker,
-               })
+type torrentTrackerAnnouncer interface {
+       statusLine() string
+       URL() *url.URL
+}
+
+func (me trackerScraper) URL() *url.URL {
+       return &me.u
+}
+
+func (ts *trackerScraper) statusLine() string {
+       var w bytes.Buffer
+       fmt.Fprintf(&w, "next ann: %v, last ann: %v",
+               func() string {
+                       na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
+                       if na > 0 {
+                               na /= time.Second
+                               na *= time.Second
+                               return na.String()
+                       } else {
+                               return "anytime"
+                       }
+               }(),
+               func() string {
+                       if ts.lastAnnounce.Err != nil {
+                               return ts.lastAnnounce.Err.Error()
+                       }
+                       if ts.lastAnnounce.Completed.IsZero() {
+                               return "never"
+                       }
+                       return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers)
+               }(),
+       )
+       return w.String()
+}
+
+type trackerAnnounceResult struct {
+       Err       error
+       NumPeers  int
+       Interval  time.Duration
+       Completed time.Time
+}
+
+func (me *trackerScraper) getIp() (ip net.IP, err error) {
+       var ips []net.IP
+       if me.lookupTrackerIp != nil {
+               ips, err = me.lookupTrackerIp(&me.u)
+       } else {
+               // Do a regular dns lookup
+               ips, err = net.LookupIP(me.u.Hostname())
+       }
+       if err != nil {
+               return
+       }
+       if len(ips) == 0 {
+               err = errors.New("no ips")
+               return
        }
+       me.t.cl.rLock()
+       defer me.t.cl.rUnlock()
+       if me.t.cl.closed.IsSet() {
+               err = errors.New("client is closed")
+               return
+       }
+       for _, ip = range ips {
+               if me.t.cl.ipIsBlocked(ip) {
+                       continue
+               }
+               switch me.u.Scheme {
+               case "udp4":
+                       if ip.To4() == nil {
+                               continue
+                       }
+               case "udp6":
+                       if ip.To4() != nil {
+                               continue
+                       }
+               }
+               return
+       }
+       err = errors.New("no acceptable ips")
        return
 }
 
+func (me *trackerScraper) trackerUrl(ip net.IP) string {
+       u := me.u
+       if u.Port() != "" {
+               u.Host = net.JoinHostPort(ip.String(), u.Port())
+       }
+       return u.String()
+}
+
 // Return how long to wait before trying again. For most errors, we return 5
 // minutes, a relatively quick turn around for DNS changes.
-func (me *trackerScraper) announce() time.Duration {
-       blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url)
-       if err != nil {
-               log.Printf("error preparing announce to %q: %s", me.url, err)
-               return 5 * time.Minute
+func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) {
+       defer func() {
+               ret.Completed = time.Now()
+       }()
+       ret.Interval = time.Minute
+
+       // Limit concurrent use of the same tracker URL by the Client.
+       ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
+       defer ref.Drop()
+       select {
+       case <-ctx.Done():
+               ret.Err = ctx.Err()
+               return
+       case ref.C() <- struct{}{}:
        }
-       if blocked {
-               log.Printf("announce to tracker %q blocked by IP", me.url)
-               return 5 * time.Minute
+       defer func() {
+               select {
+               case <-ref.C():
+               default:
+                       panic("should return immediately")
+               }
+       }()
+
+       ip, err := me.getIp()
+       if err != nil {
+               ret.Err = fmt.Errorf("error getting ip: %s", err)
+               return
        }
-       me.t.cl.mu.Lock()
-       req := me.t.announceRequest()
-       me.t.cl.mu.Unlock()
-       res, err := tracker.AnnounceHost(urlToUse, &req, host)
+       me.t.cl.rLock()
+       req := me.t.announceRequest(event)
+       me.t.cl.rUnlock()
+       // The default timeout works well as backpressure on concurrent access to the tracker. Since
+       // we're passing our own Context now, we will include that timeout ourselves to maintain similar
+       // behavior to previously, albeit with this context now being cancelled when the Torrent is
+       // closed.
+       ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout)
+       defer cancel()
+       me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req)
+       res, err := tracker.Announce{
+               Context:             ctx,
+               HttpProxy:           me.t.cl.config.HTTPProxy,
+               HttpRequestDirector: me.t.cl.config.HttpRequestDirector,
+               DialContext:         me.t.cl.config.TrackerDialContext,
+               ListenPacket:        me.t.cl.config.TrackerListenPacket,
+               UserAgent:           me.t.cl.config.HTTPUserAgent,
+               TrackerUrl:          me.trackerUrl(ip),
+               Request:             req,
+               HostHeader:          me.u.Host,
+               ServerName:          me.u.Hostname(),
+               UdpNetwork:          me.u.Scheme,
+               ClientIp4:           krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
+               ClientIp6:           krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
+               Logger:              me.t.logger,
+       }.Do()
+       me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err)
        if err != nil {
-               // log.Printf("error announcing %s %q to %q: %s", me.t.InfoHash().HexString(), me.t.Name(), me.url, err)
-               return 5 * time.Minute
+               ret.Err = fmt.Errorf("announcing: %w", err)
+               return
+       }
+       me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
+       ret.NumPeers = len(res.Peers)
+       ret.Interval = time.Duration(res.Interval) * time.Second
+       return
+}
+
+// Returns whether we can shorten the interval, and sets notify to a channel that receives when we
+// might change our mind, or leaves it if we won't.
+func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
+       gotInfo := me.t.GotInfo()
+       select {
+       case <-gotInfo:
+               // Private trackers really don't like us announcing more than they specify. They're also
+               // tracking us very carefully, so it's best to comply.
+               private := me.t.info.Private
+               return private == nil || !*private
+       default:
+               *notify = gotInfo
+               return false
        }
-       me.t.AddPeers(trackerToTorrentPeers(res.Peers))
-       return time.Duration(res.Interval) * time.Second
 }
 
 func (me *trackerScraper) Run() {
-       for {
+       defer me.announceStopped()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       go func() {
+               defer cancel()
                select {
-               case <-me.t.closed.LockedChan(&me.t.cl.mu):
-                       return
-               case <-me.stop.LockedChan(&me.t.cl.mu):
-                       return
-               case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
+               case <-ctx.Done():
+               case <-me.t.Closed():
+               }
+       }()
+
+       // make sure first announce is a "started"
+       e := tracker.Started
+
+       for {
+               ar := me.announce(ctx, e)
+               // after first announce, get back to regular "none"
+               e = tracker.None
+               me.t.cl.lock()
+               me.lastAnnounce = ar
+               me.t.cl.unlock()
+
+       recalculate:
+               // Make sure we don't announce for at least a minute since the last one.
+               interval := ar.Interval
+               if interval < time.Minute {
+                       interval = time.Minute
                }
 
-               intervalChan := time.After(me.announce())
+               me.t.cl.lock()
+               wantPeers := me.t.wantPeersEvent.C()
+               me.t.cl.unlock()
+
+               // If we want peers, reduce the interval to the minimum if it's appropriate.
 
+               // A channel that receives when we should reconsider our interval. Starts as nil since that
+               // never receives.
+               var reconsider <-chan struct{}
                select {
-               case <-me.t.closed.LockedChan(&me.t.cl.mu):
-                       return
-               case <-me.stop.LockedChan(&me.t.cl.mu):
+               case <-wantPeers:
+                       if interval > time.Minute && me.canIgnoreInterval(&reconsider) {
+                               interval = time.Minute
+                       }
+               default:
+                       reconsider = wantPeers
+               }
+
+               select {
+               case <-me.t.closed.Done():
                        return
-               case <-intervalChan:
+               case <-reconsider:
+                       // Recalculate the interval.
+                       goto recalculate
+               case <-time.After(time.Until(ar.Completed.Add(interval))):
                }
        }
 }
+
+func (me *trackerScraper) announceStopped() {
+       ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout)
+       defer cancel()
+       me.announce(ctx, tracker.Stopped)
+}