X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=tracker_scraper.go;h=863838ace478b622721a89364380d92957e497db;hb=HEAD;hp=a7d3041cfe7c37b9fafaa4537afc07cf1926ae63;hpb=15fff58d7bc69635c4eec0b1494ae6ea14f68a53;p=btrtrc.git diff --git a/tracker_scraper.go b/tracker_scraper.go index a7d3041c..863838ac 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -2,6 +2,7 @@ package torrent import ( "bytes" + "context" "errors" "fmt" "net" @@ -17,9 +18,10 @@ import ( // Announces a torrent to a tracker at regular intervals, when peers are // required. type trackerScraper struct { - u url.URL - t *Torrent - lastAnnounce trackerAnnounceResult + u url.URL + t *Torrent + lastAnnounce trackerAnnounceResult + lookupTrackerIp func(*url.URL) ([]net.IP, error) } type torrentTrackerAnnouncer interface { @@ -65,7 +67,13 @@ type trackerAnnounceResult struct { } func (me *trackerScraper) getIp() (ip net.IP, err error) { - ips, err := net.LookupIP(me.u.Hostname()) + var ips []net.IP + if me.lookupTrackerIp != nil { + ips, err = me.lookupTrackerIp(&me.u) + } else { + // Do a regular dns lookup + ips, err = net.LookupIP(me.u.Hostname()) + } if err != nil { return } @@ -73,6 +81,12 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) { err = errors.New("no ips") return } + me.t.cl.rLock() + defer me.t.cl.rUnlock() + if me.t.cl.closed.IsSet() { + err = errors.New("client is closed") + return + } for _, ip = range ips { if me.t.cl.ipIsBlocked(ip) { continue @@ -103,11 +117,29 @@ func (me *trackerScraper) trackerUrl(ip net.IP) string { // Return how long to wait before trying again. For most errors, we return 5 // minutes, a relatively quick turn around for DNS changes. -func (me *trackerScraper) announce(event tracker.AnnounceEvent) (ret trackerAnnounceResult) { +func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) { defer func() { ret.Completed = time.Now() }() ret.Interval = time.Minute + + // Limit concurrent use of the same tracker URL by the Client. + ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String()) + defer ref.Drop() + select { + case <-ctx.Done(): + ret.Err = ctx.Err() + return + case ref.C() <- struct{}{}: + } + defer func() { + select { + case <-ref.C(): + default: + panic("should return immediately") + } + }() + ip, err := me.getIp() if err != nil { ret.Err = fmt.Errorf("error getting ip: %s", err) @@ -116,41 +148,81 @@ func (me *trackerScraper) announce(event tracker.AnnounceEvent) (ret trackerAnno me.t.cl.rLock() req := me.t.announceRequest(event) me.t.cl.rUnlock() + // The default timeout works well as backpressure on concurrent access to the tracker. Since + // we're passing our own Context now, we will include that timeout ourselves to maintain similar + // behavior to previously, albeit with this context now being cancelled when the Torrent is + // closed. + ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout) + defer cancel() me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req) res, err := tracker.Announce{ - HTTPProxy: me.t.cl.config.HTTPProxy, - UserAgent: me.t.cl.config.HTTPUserAgent, - TrackerUrl: me.trackerUrl(ip), - Request: req, - HostHeader: me.u.Host, - ServerName: me.u.Hostname(), - UdpNetwork: me.u.Scheme, - ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4}, - ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6}, + Context: ctx, + HttpProxy: me.t.cl.config.HTTPProxy, + HttpRequestDirector: me.t.cl.config.HttpRequestDirector, + DialContext: me.t.cl.config.TrackerDialContext, + ListenPacket: me.t.cl.config.TrackerListenPacket, + UserAgent: me.t.cl.config.HTTPUserAgent, + TrackerUrl: me.trackerUrl(ip), + Request: req, + HostHeader: me.u.Host, + ServerName: me.u.Hostname(), + UdpNetwork: me.u.Scheme, + ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4}, + ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6}, + Logger: me.t.logger, }.Do() + me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err) if err != nil { - ret.Err = fmt.Errorf("error announcing: %s", err) + ret.Err = fmt.Errorf("announcing: %w", err) return } - me.t.AddPeers(Peers(nil).AppendFromTracker(res.Peers)) + me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers)) ret.NumPeers = len(res.Peers) ret.Interval = time.Duration(res.Interval) * time.Second return } +// Returns whether we can shorten the interval, and sets notify to a channel that receives when we +// might change our mind, or leaves it if we won't. +func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool { + gotInfo := me.t.GotInfo() + select { + case <-gotInfo: + // Private trackers really don't like us announcing more than they specify. They're also + // tracking us very carefully, so it's best to comply. + private := me.t.info.Private + return private == nil || !*private + default: + *notify = gotInfo + return false + } +} + func (me *trackerScraper) Run() { defer me.announceStopped() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer cancel() + select { + case <-ctx.Done(): + case <-me.t.Closed(): + } + }() + // make sure first announce is a "started" e := tracker.Started + for { - ar := me.announce(e) + ar := me.announce(ctx, e) // after first announce, get back to regular "none" e = tracker.None me.t.cl.lock() me.lastAnnounce = ar me.t.cl.unlock() - wait: + recalculate: // Make sure we don't announce for at least a minute since the last one. interval := ar.Interval if interval < time.Minute { @@ -159,31 +231,35 @@ func (me *trackerScraper) Run() { me.t.cl.lock() wantPeers := me.t.wantPeersEvent.C() - closed := me.t.closed.C() me.t.cl.unlock() - // If we want peers, reduce the interval to the minimum. + // If we want peers, reduce the interval to the minimum if it's appropriate. + + // A channel that receives when we should reconsider our interval. Starts as nil since that + // never receives. + var reconsider <-chan struct{} select { case <-wantPeers: - if interval > time.Minute { + if interval > time.Minute && me.canIgnoreInterval(&reconsider) { interval = time.Minute } - // Now we're at the minimum, don't trigger on it anymore. - wantPeers = nil default: + reconsider = wantPeers } select { - case <-closed: + case <-me.t.closed.Done(): return - case <-wantPeers: + case <-reconsider: // Recalculate the interval. - goto wait + goto recalculate case <-time.After(time.Until(ar.Completed.Add(interval))): } } } func (me *trackerScraper) announceStopped() { - me.announce(tracker.Stopped) + ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout) + defer cancel() + me.announce(ctx, tracker.Stopped) }