]> Sergey Matveev's repositories - btrtrc.git/blob - tracker_scraper.go
Use Option with request piece states cache
[btrtrc.git] / tracker_scraper.go
1 package torrent
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "fmt"
8         "net"
9         "net/url"
10         "time"
11
12         "github.com/anacrolix/dht/v2/krpc"
13         "github.com/anacrolix/log"
14
15         "github.com/anacrolix/torrent/tracker"
16 )
17
18 // Announces a torrent to a tracker at regular intervals, when peers are
19 // required.
20 type trackerScraper struct {
21         shortInfohash   [20]byte
22         u               url.URL
23         t               *Torrent
24         lastAnnounce    trackerAnnounceResult
25         lookupTrackerIp func(*url.URL) ([]net.IP, error)
26 }
27
28 type torrentTrackerAnnouncer interface {
29         statusLine() string
30         URL() *url.URL
31 }
32
33 func (me trackerScraper) URL() *url.URL {
34         return &me.u
35 }
36
37 func (ts *trackerScraper) statusLine() string {
38         var w bytes.Buffer
39         fmt.Fprintf(&w, "next ann: %v, last ann: %v",
40                 func() string {
41                         na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
42                         if na > 0 {
43                                 na /= time.Second
44                                 na *= time.Second
45                                 return na.String()
46                         } else {
47                                 return "anytime"
48                         }
49                 }(),
50                 func() string {
51                         if ts.lastAnnounce.Err != nil {
52                                 return ts.lastAnnounce.Err.Error()
53                         }
54                         if ts.lastAnnounce.Completed.IsZero() {
55                                 return "never"
56                         }
57                         return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers)
58                 }(),
59         )
60         return w.String()
61 }
62
63 type trackerAnnounceResult struct {
64         Err       error
65         NumPeers  int
66         Interval  time.Duration
67         Completed time.Time
68 }
69
70 func (me *trackerScraper) getIp() (ip net.IP, err error) {
71         var ips []net.IP
72         if me.lookupTrackerIp != nil {
73                 ips, err = me.lookupTrackerIp(&me.u)
74         } else {
75                 // Do a regular dns lookup
76                 ips, err = net.LookupIP(me.u.Hostname())
77         }
78         if err != nil {
79                 return
80         }
81         if len(ips) == 0 {
82                 err = errors.New("no ips")
83                 return
84         }
85         me.t.cl.rLock()
86         defer me.t.cl.rUnlock()
87         if me.t.cl.closed.IsSet() {
88                 err = errors.New("client is closed")
89                 return
90         }
91         for _, ip = range ips {
92                 if me.t.cl.ipIsBlocked(ip) {
93                         continue
94                 }
95                 switch me.u.Scheme {
96                 case "udp4":
97                         if ip.To4() == nil {
98                                 continue
99                         }
100                 case "udp6":
101                         if ip.To4() != nil {
102                                 continue
103                         }
104                 }
105                 return
106         }
107         err = errors.New("no acceptable ips")
108         return
109 }
110
111 func (me *trackerScraper) trackerUrl(ip net.IP) string {
112         u := me.u
113         if u.Port() != "" {
114                 u.Host = net.JoinHostPort(ip.String(), u.Port())
115         }
116         return u.String()
117 }
118
119 // Return how long to wait before trying again. For most errors, we return 5
120 // minutes, a relatively quick turn around for DNS changes.
121 func (me *trackerScraper) announce(
122         ctx context.Context,
123         event tracker.AnnounceEvent,
124 ) (ret trackerAnnounceResult) {
125         defer func() {
126                 ret.Completed = time.Now()
127         }()
128         ret.Interval = time.Minute
129
130         // Limit concurrent use of the same tracker URL by the Client.
131         ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
132         defer ref.Drop()
133         select {
134         case <-ctx.Done():
135                 ret.Err = ctx.Err()
136                 return
137         case ref.C() <- struct{}{}:
138         }
139         defer func() {
140                 select {
141                 case <-ref.C():
142                 default:
143                         panic("should return immediately")
144                 }
145         }()
146
147         ip, err := me.getIp()
148         if err != nil {
149                 ret.Err = fmt.Errorf("error getting ip: %s", err)
150                 return
151         }
152         me.t.cl.rLock()
153         req := me.t.announceRequest(event, me.shortInfohash)
154         me.t.cl.rUnlock()
155         // The default timeout works well as backpressure on concurrent access to the tracker. Since
156         // we're passing our own Context now, we will include that timeout ourselves to maintain similar
157         // behavior to previously, albeit with this context now being cancelled when the Torrent is
158         // closed.
159         ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout)
160         defer cancel()
161         me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req)
162         res, err := tracker.Announce{
163                 Context:             ctx,
164                 HttpProxy:           me.t.cl.config.HTTPProxy,
165                 HttpRequestDirector: me.t.cl.config.HttpRequestDirector,
166                 DialContext:         me.t.cl.config.TrackerDialContext,
167                 ListenPacket:        me.t.cl.config.TrackerListenPacket,
168                 UserAgent:           me.t.cl.config.HTTPUserAgent,
169                 TrackerUrl:          me.trackerUrl(ip),
170                 Request:             req,
171                 HostHeader:          me.u.Host,
172                 ServerName:          me.u.Hostname(),
173                 UdpNetwork:          me.u.Scheme,
174                 ClientIp4:           krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
175                 ClientIp6:           krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
176                 Logger:              me.t.logger,
177         }.Do()
178         me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err)
179         if err != nil {
180                 ret.Err = fmt.Errorf("announcing: %w", err)
181                 return
182         }
183         me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
184         ret.NumPeers = len(res.Peers)
185         ret.Interval = time.Duration(res.Interval) * time.Second
186         return
187 }
188
189 // Returns whether we can shorten the interval, and sets notify to a channel that receives when we
190 // might change our mind, or leaves it if we won't.
191 func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
192         gotInfo := me.t.GotInfo()
193         select {
194         case <-gotInfo:
195                 // Private trackers really don't like us announcing more than they specify. They're also
196                 // tracking us very carefully, so it's best to comply.
197                 private := me.t.info.Private
198                 return private == nil || !*private
199         default:
200                 *notify = gotInfo
201                 return false
202         }
203 }
204
205 func (me *trackerScraper) Run() {
206         defer me.announceStopped()
207
208         ctx, cancel := context.WithCancel(context.Background())
209         defer cancel()
210         go func() {
211                 defer cancel()
212                 select {
213                 case <-ctx.Done():
214                 case <-me.t.Closed():
215                 }
216         }()
217
218         // make sure first announce is a "started"
219         e := tracker.Started
220
221         for {
222                 ar := me.announce(ctx, e)
223                 // after first announce, get back to regular "none"
224                 e = tracker.None
225                 me.t.cl.lock()
226                 me.lastAnnounce = ar
227                 me.t.cl.unlock()
228
229         recalculate:
230                 // Make sure we don't announce for at least a minute since the last one.
231                 interval := ar.Interval
232                 if interval < time.Minute {
233                         interval = time.Minute
234                 }
235
236                 me.t.cl.lock()
237                 wantPeers := me.t.wantPeersEvent.C()
238                 me.t.cl.unlock()
239
240                 // If we want peers, reduce the interval to the minimum if it's appropriate.
241
242                 // A channel that receives when we should reconsider our interval. Starts as nil since that
243                 // never receives.
244                 var reconsider <-chan struct{}
245                 select {
246                 case <-wantPeers:
247                         if interval > time.Minute && me.canIgnoreInterval(&reconsider) {
248                                 interval = time.Minute
249                         }
250                 default:
251                         reconsider = wantPeers
252                 }
253
254                 select {
255                 case <-me.t.closed.Done():
256                         return
257                 case <-reconsider:
258                         // Recalculate the interval.
259                         goto recalculate
260                 case <-time.After(time.Until(ar.Completed.Add(interval))):
261                 }
262         }
263 }
264
265 func (me *trackerScraper) announceStopped() {
266         ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout)
267         defer cancel()
268         me.announce(ctx, tracker.Stopped)
269 }