X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=tracker_scraper.go;h=863838ace478b622721a89364380d92957e497db;hb=HEAD;hp=2c64390a60bbc190f726c2f901c879883a085d60;hpb=9224303c1e869770565ceb7c2ffd9a9376744097;p=btrtrc.git diff --git a/tracker_scraper.go b/tracker_scraper.go index 2c64390a..863838ac 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -2,6 +2,7 @@ package torrent import ( "bytes" + "context" "errors" "fmt" "net" @@ -9,21 +10,32 @@ import ( "time" "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/log" + "github.com/anacrolix/torrent/tracker" ) // Announces a torrent to a tracker at regular intervals, when peers are // required. type trackerScraper struct { - u url.URL - t *Torrent - lastAnnounce trackerAnnounceResult + u url.URL + t *Torrent + lastAnnounce trackerAnnounceResult + lookupTrackerIp func(*url.URL) ([]net.IP, error) +} + +type torrentTrackerAnnouncer interface { + statusLine() string + URL() *url.URL +} + +func (me trackerScraper) URL() *url.URL { + return &me.u } func (ts *trackerScraper) statusLine() string { var w bytes.Buffer - fmt.Fprintf(&w, "%q\t%s\t%s", - ts.u.String(), + fmt.Fprintf(&w, "next ann: %v, last ann: %v", func() string { na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval)) if na > 0 { @@ -55,7 +67,13 @@ type trackerAnnounceResult struct { } func (me *trackerScraper) getIp() (ip net.IP, err error) { - ips, err := net.LookupIP(me.u.Hostname()) + var ips []net.IP + if me.lookupTrackerIp != nil { + ips, err = me.lookupTrackerIp(&me.u) + } else { + // Do a regular dns lookup + ips, err = net.LookupIP(me.u.Hostname()) + } if err != nil { return } @@ -63,6 +81,12 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) { err = errors.New("no ips") return } + me.t.cl.rLock() + defer me.t.cl.rUnlock() + if me.t.cl.closed.IsSet() { + err = errors.New("client is closed") + return + } for _, ip = range ips { if me.t.cl.ipIsBlocked(ip) { continue @@ -93,78 +117,149 @@ func (me *trackerScraper) trackerUrl(ip net.IP) string { // Return how long to wait before trying again. For most errors, we return 5 // minutes, a relatively quick turn around for DNS changes. -func (me *trackerScraper) announce(event tracker.AnnounceEvent) (ret trackerAnnounceResult) { +func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) { defer func() { ret.Completed = time.Now() }() - ret.Interval = 5 * time.Minute + ret.Interval = time.Minute + + // Limit concurrent use of the same tracker URL by the Client. + ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String()) + defer ref.Drop() + select { + case <-ctx.Done(): + ret.Err = ctx.Err() + return + case ref.C() <- struct{}{}: + } + defer func() { + select { + case <-ref.C(): + default: + panic("should return immediately") + } + }() + ip, err := me.getIp() if err != nil { ret.Err = fmt.Errorf("error getting ip: %s", err) return } - me.t.cl.lock() + me.t.cl.rLock() req := me.t.announceRequest(event) - me.t.cl.unlock() - //log.Printf("announcing %s %s to %q", me.t, req.Event, me.u.String()) + me.t.cl.rUnlock() + // The default timeout works well as backpressure on concurrent access to the tracker. Since + // we're passing our own Context now, we will include that timeout ourselves to maintain similar + // behavior to previously, albeit with this context now being cancelled when the Torrent is + // closed. + ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout) + defer cancel() + me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req) res, err := tracker.Announce{ - HTTPProxy: me.t.cl.config.HTTPProxy, - UserAgent: me.t.cl.config.HTTPUserAgent, - TrackerUrl: me.trackerUrl(ip), - Request: req, - HostHeader: me.u.Host, - ServerName: me.u.Hostname(), - UdpNetwork: me.u.Scheme, - ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4}, - ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6}, + Context: ctx, + HttpProxy: me.t.cl.config.HTTPProxy, + HttpRequestDirector: me.t.cl.config.HttpRequestDirector, + DialContext: me.t.cl.config.TrackerDialContext, + ListenPacket: me.t.cl.config.TrackerListenPacket, + UserAgent: me.t.cl.config.HTTPUserAgent, + TrackerUrl: me.trackerUrl(ip), + Request: req, + HostHeader: me.u.Host, + ServerName: me.u.Hostname(), + UdpNetwork: me.u.Scheme, + ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4}, + ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6}, + Logger: me.t.logger, }.Do() + me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err) if err != nil { - ret.Err = fmt.Errorf("error announcing: %s", err) + ret.Err = fmt.Errorf("announcing: %w", err) return } - me.t.AddPeers(Peers(nil).AppendFromTracker(res.Peers)) + me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers)) ret.NumPeers = len(res.Peers) ret.Interval = time.Duration(res.Interval) * time.Second return } +// Returns whether we can shorten the interval, and sets notify to a channel that receives when we +// might change our mind, or leaves it if we won't. +func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool { + gotInfo := me.t.GotInfo() + select { + case <-gotInfo: + // Private trackers really don't like us announcing more than they specify. They're also + // tracking us very carefully, so it's best to comply. + private := me.t.info.Private + return private == nil || !*private + default: + *notify = gotInfo + return false + } +} + func (me *trackerScraper) Run() { defer me.announceStopped() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer cancel() + select { + case <-ctx.Done(): + case <-me.t.Closed(): + } + }() + // make sure first announce is a "started" e := tracker.Started + for { - ar := me.announce(e) + ar := me.announce(ctx, e) // after first announce, get back to regular "none" e = tracker.None me.t.cl.lock() me.lastAnnounce = ar me.t.cl.unlock() - wait: + recalculate: + // Make sure we don't announce for at least a minute since the last one. interval := ar.Interval if interval < time.Minute { interval = time.Minute } - wantPeers := me.t.wantPeersEvent.LockedChan(me.t.cl.locker()) + + me.t.cl.lock() + wantPeers := me.t.wantPeersEvent.C() + me.t.cl.unlock() + + // If we want peers, reduce the interval to the minimum if it's appropriate. + + // A channel that receives when we should reconsider our interval. Starts as nil since that + // never receives. + var reconsider <-chan struct{} select { case <-wantPeers: - if interval > time.Minute { + if interval > time.Minute && me.canIgnoreInterval(&reconsider) { interval = time.Minute } - wantPeers = nil default: + reconsider = wantPeers } select { - case <-me.t.closed.LockedChan(me.t.cl.locker()): + case <-me.t.closed.Done(): return - case <-wantPeers: - goto wait + case <-reconsider: + // Recalculate the interval. + goto recalculate case <-time.After(time.Until(ar.Completed.Add(interval))): } } } func (me *trackerScraper) announceStopped() { - me.announce(tracker.Stopped) + ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout) + defer cancel() + me.announce(ctx, tracker.Stopped) }