import (
"bytes"
+ "context"
"errors"
"fmt"
"net"
"net/url"
"time"
- "github.com/anacrolix/dht/krpc"
- "github.com/anacrolix/missinggo"
+ "github.com/anacrolix/dht/v2/krpc"
+ "github.com/anacrolix/log"
"github.com/anacrolix/torrent/tracker"
)
// Announces a torrent to a tracker at regular intervals, when peers are
// required.
type trackerScraper struct {
- u url.URL
- // Causes the trackerScraper to stop running.
- stop missinggo.Event
- t *Torrent
- lastAnnounce trackerAnnounceResult
+ u url.URL
+ t *Torrent
+ lastAnnounce trackerAnnounceResult
+ lookupTrackerIp func(*url.URL) ([]net.IP, error)
+}
+
+type torrentTrackerAnnouncer interface {
+ statusLine() string
+ URL() *url.URL
+}
+
+func (me trackerScraper) URL() *url.URL {
+ return &me.u
}
func (ts *trackerScraper) statusLine() string {
var w bytes.Buffer
- fmt.Fprintf(&w, "%q\t%s\t%s",
- ts.u.String(),
+ fmt.Fprintf(&w, "next ann: %v, last ann: %v",
func() string {
na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
if na > 0 {
}
func (me *trackerScraper) getIp() (ip net.IP, err error) {
- ips, err := net.LookupIP(me.u.Hostname())
+ var ips []net.IP
+ if me.lookupTrackerIp != nil {
+ ips, err = me.lookupTrackerIp(&me.u)
+ } else {
+ // Do a regular dns lookup
+ ips, err = net.LookupIP(me.u.Hostname())
+ }
if err != nil {
return
}
err = errors.New("no ips")
return
}
+ me.t.cl.rLock()
+ defer me.t.cl.rUnlock()
+ if me.t.cl.closed.IsSet() {
+ err = errors.New("client is closed")
+ return
+ }
for _, ip = range ips {
if me.t.cl.ipIsBlocked(ip) {
continue
// Return how long to wait before trying again. For most errors, we return 5
// minutes, a relatively quick turn around for DNS changes.
-func (me *trackerScraper) announce() (ret trackerAnnounceResult) {
+func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) {
defer func() {
ret.Completed = time.Now()
}()
- ret.Interval = 5 * time.Minute
+ ret.Interval = time.Minute
+
+ // Limit concurrent use of the same tracker URL by the Client.
+ ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
+ defer ref.Drop()
+ select {
+ case <-ctx.Done():
+ ret.Err = ctx.Err()
+ return
+ case ref.C() <- struct{}{}:
+ }
+ defer func() {
+ select {
+ case <-ref.C():
+ default:
+ panic("should return immediately")
+ }
+ }()
+
ip, err := me.getIp()
if err != nil {
ret.Err = fmt.Errorf("error getting ip: %s", err)
return
}
- me.t.cl.lock()
- req := me.t.announceRequest()
- me.t.cl.unlock()
+ me.t.cl.rLock()
+ req := me.t.announceRequest(event)
+ me.t.cl.rUnlock()
+ // The default timeout works well as backpressure on concurrent access to the tracker. Since
+ // we're passing our own Context now, we will include that timeout ourselves to maintain similar
+ // behavior to previously, albeit with this context now being cancelled when the Torrent is
+ // closed.
+ ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout)
+ defer cancel()
+ me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req)
res, err := tracker.Announce{
- UserAgent: me.t.cl.config.HTTPUserAgent,
- TrackerUrl: me.trackerUrl(ip),
- Request: req,
- HostHeader: me.u.Host,
- ServerName: me.u.Hostname(),
- UdpNetwork: me.u.Scheme,
- ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
- ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
+ Context: ctx,
+ HttpProxy: me.t.cl.config.HTTPProxy,
+ HttpRequestDirector: me.t.cl.config.HttpRequestDirector,
+ DialContext: me.t.cl.config.TrackerDialContext,
+ ListenPacket: me.t.cl.config.TrackerListenPacket,
+ UserAgent: me.t.cl.config.HTTPUserAgent,
+ TrackerUrl: me.trackerUrl(ip),
+ Request: req,
+ HostHeader: me.u.Host,
+ ServerName: me.u.Hostname(),
+ UdpNetwork: me.u.Scheme,
+ ClientIp4: krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
+ ClientIp6: krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
+ Logger: me.t.logger,
}.Do()
+ me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err)
if err != nil {
- ret.Err = fmt.Errorf("error announcing: %s", err)
+ ret.Err = fmt.Errorf("announcing: %w", err)
return
}
- me.t.AddPeers(Peers(nil).AppendFromTracker(res.Peers))
+ me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
ret.NumPeers = len(res.Peers)
ret.Interval = time.Duration(res.Interval) * time.Second
return
}
+// Returns whether we can shorten the interval, and sets notify to a channel that receives when we
+// might change our mind, or leaves it if we won't.
+func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
+ gotInfo := me.t.GotInfo()
+ select {
+ case <-gotInfo:
+ // Private trackers really don't like us announcing more than they specify. They're also
+ // tracking us very carefully, so it's best to comply.
+ private := me.t.info.Private
+ return private == nil || !*private
+ default:
+ *notify = gotInfo
+ return false
+ }
+}
+
func (me *trackerScraper) Run() {
+ defer me.announceStopped()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go func() {
+ defer cancel()
+ select {
+ case <-ctx.Done():
+ case <-me.t.Closed():
+ }
+ }()
+
+ // make sure first announce is a "started"
+ e := tracker.Started
+
for {
- ar := me.announce()
+ ar := me.announce(ctx, e)
+ // after first announce, get back to regular "none"
+ e = tracker.None
me.t.cl.lock()
me.lastAnnounce = ar
me.t.cl.unlock()
- wait:
+ recalculate:
+ // Make sure we don't announce for at least a minute since the last one.
interval := ar.Interval
if interval < time.Minute {
interval = time.Minute
}
- wantPeers := me.t.wantPeersEvent.LockedChan(me.t.cl.locker())
+
+ me.t.cl.lock()
+ wantPeers := me.t.wantPeersEvent.C()
+ me.t.cl.unlock()
+
+ // If we want peers, reduce the interval to the minimum if it's appropriate.
+
+ // A channel that receives when we should reconsider our interval. Starts as nil since that
+ // never receives.
+ var reconsider <-chan struct{}
select {
case <-wantPeers:
- if interval > time.Minute {
+ if interval > time.Minute && me.canIgnoreInterval(&reconsider) {
interval = time.Minute
}
- wantPeers = nil
default:
+ reconsider = wantPeers
}
select {
- case <-me.t.closed.LockedChan(me.t.cl.locker()):
+ case <-me.t.closed.Done():
return
- case <-me.stop.LockedChan(me.t.cl.locker()):
- return
- case <-wantPeers:
- goto wait
+ case <-reconsider:
+ // Recalculate the interval.
+ goto recalculate
case <-time.After(time.Until(ar.Completed.Add(interval))):
}
}
}
+
+func (me *trackerScraper) announceStopped() {
+ ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout)
+ defer cancel()
+ me.announce(ctx, tracker.Stopped)
+}