X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=tracker_scraper.go;h=863838ace478b622721a89364380d92957e497db;hb=HEAD;hp=cf8f99fe86999fbfc71028023444b2e5b6c8b102;hpb=1e8c6808b2fd49dce985e733d79c9be6f8e081cf;p=btrtrc.git diff --git a/tracker_scraper.go b/tracker_scraper.go index cf8f99fe..863838ac 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -2,11 +2,15 @@ package torrent import ( "bytes" + "context" "errors" "fmt" + "net" + "net/url" "time" - "github.com/anacrolix/missinggo" + "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/log" "github.com/anacrolix/torrent/tracker" ) @@ -14,23 +18,29 @@ import ( // Announces a torrent to a tracker at regular intervals, when peers are // required. type trackerScraper struct { - url string - // Causes the trackerScraper to stop running. - stop missinggo.Event - t *Torrent - lastAnnounce trackerAnnounceResult + u url.URL + t *Torrent + lastAnnounce trackerAnnounceResult + lookupTrackerIp func(*url.URL) ([]net.IP, error) +} + +type torrentTrackerAnnouncer interface { + statusLine() string + URL() *url.URL +} + +func (me trackerScraper) URL() *url.URL { + return &me.u } func (ts *trackerScraper) statusLine() string { var w bytes.Buffer - fmt.Fprintf(&w, "%q\t%s\t%s", - ts.url, + fmt.Fprintf(&w, "next ann: %v, last ann: %v", func() string { - // return ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval).Format("2006-01-02 15:04:05 -0700 MST") - na := ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval).Sub(time.Now()) - na /= time.Second - na *= time.Second + na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval)) if na > 0 { + na /= time.Second + na *= time.Second return na.String() } else { return "anytime" @@ -44,7 +54,8 @@ func (ts *trackerScraper) statusLine() string { return "never" } return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers) - }()) + }(), + ) return w.String() } @@ -55,71 +66,200 @@ type trackerAnnounceResult struct { Completed time.Time } -func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) { - ret = make([]Peer, 0, len(ps)) - for _, p := range ps { - ret = append(ret, Peer{ - IP: p.IP, - Port: p.Port, - Source: peerSourceTracker, - }) +func (me *trackerScraper) getIp() (ip net.IP, err error) { + var ips []net.IP + if me.lookupTrackerIp != nil { + ips, err = me.lookupTrackerIp(&me.u) + } else { + // Do a regular dns lookup + ips, err = net.LookupIP(me.u.Hostname()) } + if err != nil { + return + } + if len(ips) == 0 { + err = errors.New("no ips") + return + } + me.t.cl.rLock() + defer me.t.cl.rUnlock() + if me.t.cl.closed.IsSet() { + err = errors.New("client is closed") + return + } + for _, ip = range ips { + if me.t.cl.ipIsBlocked(ip) { + continue + } + switch me.u.Scheme { + case "udp4": + if ip.To4() == nil { + continue + } + case "udp6": + if ip.To4() != nil { + continue + } + } + return + } + err = errors.New("no acceptable ips") return } +func (me *trackerScraper) trackerUrl(ip net.IP) string { + u := me.u + if u.Port() != "" { + u.Host = net.JoinHostPort(ip.String(), u.Port()) + } + return u.String() +} + // Return how long to wait before trying again. For most errors, we return 5 // minutes, a relatively quick turn around for DNS changes. -func (me *trackerScraper) announce() (ret trackerAnnounceResult) { +func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) { defer func() { ret.Completed = time.Now() }() - ret.Interval = 5 * time.Minute - blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url) - if err != nil { - ret.Err = err + ret.Interval = time.Minute + + // Limit concurrent use of the same tracker URL by the Client. + ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String()) + defer ref.Drop() + select { + case <-ctx.Done(): + ret.Err = ctx.Err() return + case ref.C() <- struct{}{}: } - if blocked { - ret.Err = errors.New("blocked by IP") + defer func() { + select { + case <-ref.C(): + default: + panic("should return immediately") + } + }() + + ip, err := me.getIp() + if err != nil { + ret.Err = fmt.Errorf("error getting ip: %s", err) return } - me.t.cl.mu.Lock() - req := me.t.announceRequest() - me.t.cl.mu.Unlock() - res, err := tracker.AnnounceHost(urlToUse, &req, host) + me.t.cl.rLock() + req := me.t.announceRequest(event) + me.t.cl.rUnlock() + // The default timeout works well as backpressure on concurrent access to the tracker. Since + // we're passing our own Context now, we will include that timeout ourselves to maintain similar + // behavior to previously, albeit with this context now being cancelled when the Torrent is + // closed. + ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout) + defer cancel() + me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req) + res, err := tracker.Announce{ + Context: ctx, + HttpProxy: me.t.cl.config.HTTPProxy, + HttpRequestDirector: me.t.cl.config.HttpRequestDirector, + DialContext: me.t.cl.config.TrackerDialContext, + ListenPacket: me.t.cl.config.TrackerListenPacket, + UserAgent: me.t.cl.config.HTTPUserAgent, + TrackerUrl: me.trackerUrl(ip), + Request: req, + HostHeader: me.u.Host, + ServerName: me.u.Hostname(), + UdpNetwork: me.u.Scheme, + ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4}, + ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6}, + Logger: me.t.logger, + }.Do() + me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err) if err != nil { - ret.Err = err + ret.Err = fmt.Errorf("announcing: %w", err) return } - me.t.AddPeers(trackerToTorrentPeers(res.Peers)) + me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers)) ret.NumPeers = len(res.Peers) ret.Interval = time.Duration(res.Interval) * time.Second return } +// Returns whether we can shorten the interval, and sets notify to a channel that receives when we +// might change our mind, or leaves it if we won't. +func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool { + gotInfo := me.t.GotInfo() + select { + case <-gotInfo: + // Private trackers really don't like us announcing more than they specify. They're also + // tracking us very carefully, so it's best to comply. + private := me.t.info.Private + return private == nil || !*private + default: + *notify = gotInfo + return false + } +} + func (me *trackerScraper) Run() { - for { + defer me.announceStopped() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer cancel() select { - case <-me.t.closed.LockedChan(&me.t.cl.mu): - return - case <-me.stop.LockedChan(&me.t.cl.mu): - return - case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu): + case <-ctx.Done(): + case <-me.t.Closed(): } + }() - ar := me.announce() - me.t.cl.mu.Lock() + // make sure first announce is a "started" + e := tracker.Started + + for { + ar := me.announce(ctx, e) + // after first announce, get back to regular "none" + e = tracker.None + me.t.cl.lock() me.lastAnnounce = ar - me.t.cl.mu.Unlock() + me.t.cl.unlock() + + recalculate: + // Make sure we don't announce for at least a minute since the last one. + interval := ar.Interval + if interval < time.Minute { + interval = time.Minute + } + + me.t.cl.lock() + wantPeers := me.t.wantPeersEvent.C() + me.t.cl.unlock() - intervalChan := time.After(ar.Completed.Add(ar.Interval).Sub(time.Now())) + // If we want peers, reduce the interval to the minimum if it's appropriate. + // A channel that receives when we should reconsider our interval. Starts as nil since that + // never receives. + var reconsider <-chan struct{} select { - case <-me.t.closed.LockedChan(&me.t.cl.mu): - return - case <-me.stop.LockedChan(&me.t.cl.mu): + case <-wantPeers: + if interval > time.Minute && me.canIgnoreInterval(&reconsider) { + interval = time.Minute + } + default: + reconsider = wantPeers + } + + select { + case <-me.t.closed.Done(): return - case <-intervalChan: + case <-reconsider: + // Recalculate the interval. + goto recalculate + case <-time.After(time.Until(ar.Completed.Add(interval))): } } } + +func (me *trackerScraper) announceStopped() { + ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout) + defer cancel() + me.announce(ctx, tracker.Stopped) +}