]> Sergey Matveev's repositories - btrtrc.git/blob - tracker_scraper.go
Drop support for go 1.20
[btrtrc.git] / tracker_scraper.go
1 package torrent
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "fmt"
8         "net"
9         "net/url"
10         "time"
11
12         "github.com/anacrolix/dht/v2/krpc"
13         "github.com/anacrolix/log"
14
15         "github.com/anacrolix/torrent/tracker"
16 )
17
18 // Announces a torrent to a tracker at regular intervals, when peers are
19 // required.
20 type trackerScraper struct {
21         u               url.URL
22         t               *Torrent
23         lastAnnounce    trackerAnnounceResult
24         lookupTrackerIp func(*url.URL) ([]net.IP, error)
25 }
26
27 type torrentTrackerAnnouncer interface {
28         statusLine() string
29         URL() *url.URL
30 }
31
32 func (me trackerScraper) URL() *url.URL {
33         return &me.u
34 }
35
36 func (ts *trackerScraper) statusLine() string {
37         var w bytes.Buffer
38         fmt.Fprintf(&w, "next ann: %v, last ann: %v",
39                 func() string {
40                         na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
41                         if na > 0 {
42                                 na /= time.Second
43                                 na *= time.Second
44                                 return na.String()
45                         } else {
46                                 return "anytime"
47                         }
48                 }(),
49                 func() string {
50                         if ts.lastAnnounce.Err != nil {
51                                 return ts.lastAnnounce.Err.Error()
52                         }
53                         if ts.lastAnnounce.Completed.IsZero() {
54                                 return "never"
55                         }
56                         return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers)
57                 }(),
58         )
59         return w.String()
60 }
61
62 type trackerAnnounceResult struct {
63         Err       error
64         NumPeers  int
65         Interval  time.Duration
66         Completed time.Time
67 }
68
69 func (me *trackerScraper) getIp() (ip net.IP, err error) {
70         var ips []net.IP
71         if me.lookupTrackerIp != nil {
72                 ips, err = me.lookupTrackerIp(&me.u)
73         } else {
74                 // Do a regular dns lookup
75                 ips, err = net.LookupIP(me.u.Hostname())
76         }
77         if err != nil {
78                 return
79         }
80         if len(ips) == 0 {
81                 err = errors.New("no ips")
82                 return
83         }
84         me.t.cl.rLock()
85         defer me.t.cl.rUnlock()
86         if me.t.cl.closed.IsSet() {
87                 err = errors.New("client is closed")
88                 return
89         }
90         for _, ip = range ips {
91                 if me.t.cl.ipIsBlocked(ip) {
92                         continue
93                 }
94                 switch me.u.Scheme {
95                 case "udp4":
96                         if ip.To4() == nil {
97                                 continue
98                         }
99                 case "udp6":
100                         if ip.To4() != nil {
101                                 continue
102                         }
103                 }
104                 return
105         }
106         err = errors.New("no acceptable ips")
107         return
108 }
109
110 func (me *trackerScraper) trackerUrl(ip net.IP) string {
111         u := me.u
112         if u.Port() != "" {
113                 u.Host = net.JoinHostPort(ip.String(), u.Port())
114         }
115         return u.String()
116 }
117
118 // Return how long to wait before trying again. For most errors, we return 5
119 // minutes, a relatively quick turn around for DNS changes.
120 func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) {
121         defer func() {
122                 ret.Completed = time.Now()
123         }()
124         ret.Interval = time.Minute
125
126         // Limit concurrent use of the same tracker URL by the Client.
127         ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
128         defer ref.Drop()
129         select {
130         case <-ctx.Done():
131                 ret.Err = ctx.Err()
132                 return
133         case ref.C() <- struct{}{}:
134         }
135         defer func() {
136                 select {
137                 case <-ref.C():
138                 default:
139                         panic("should return immediately")
140                 }
141         }()
142
143         ip, err := me.getIp()
144         if err != nil {
145                 ret.Err = fmt.Errorf("error getting ip: %s", err)
146                 return
147         }
148         me.t.cl.rLock()
149         req := me.t.announceRequest(event)
150         me.t.cl.rUnlock()
151         // The default timeout works well as backpressure on concurrent access to the tracker. Since
152         // we're passing our own Context now, we will include that timeout ourselves to maintain similar
153         // behavior to previously, albeit with this context now being cancelled when the Torrent is
154         // closed.
155         ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout)
156         defer cancel()
157         me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req)
158         res, err := tracker.Announce{
159                 Context:             ctx,
160                 HttpProxy:           me.t.cl.config.HTTPProxy,
161                 HttpRequestDirector: me.t.cl.config.HttpRequestDirector,
162                 DialContext:         me.t.cl.config.TrackerDialContext,
163                 ListenPacket:        me.t.cl.config.TrackerListenPacket,
164                 UserAgent:           me.t.cl.config.HTTPUserAgent,
165                 TrackerUrl:          me.trackerUrl(ip),
166                 Request:             req,
167                 HostHeader:          me.u.Host,
168                 ServerName:          me.u.Hostname(),
169                 UdpNetwork:          me.u.Scheme,
170                 ClientIp4:           krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
171                 ClientIp6:           krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
172                 Logger:              me.t.logger,
173         }.Do()
174         me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err)
175         if err != nil {
176                 ret.Err = fmt.Errorf("announcing: %w", err)
177                 return
178         }
179         me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
180         ret.NumPeers = len(res.Peers)
181         ret.Interval = time.Duration(res.Interval) * time.Second
182         return
183 }
184
185 // Returns whether we can shorten the interval, and sets notify to a channel that receives when we
186 // might change our mind, or leaves it if we won't.
187 func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
188         gotInfo := me.t.GotInfo()
189         select {
190         case <-gotInfo:
191                 // Private trackers really don't like us announcing more than they specify. They're also
192                 // tracking us very carefully, so it's best to comply.
193                 private := me.t.info.Private
194                 return private == nil || !*private
195         default:
196                 *notify = gotInfo
197                 return false
198         }
199 }
200
201 func (me *trackerScraper) Run() {
202         defer me.announceStopped()
203
204         ctx, cancel := context.WithCancel(context.Background())
205         defer cancel()
206         go func() {
207                 defer cancel()
208                 select {
209                 case <-ctx.Done():
210                 case <-me.t.Closed():
211                 }
212         }()
213
214         // make sure first announce is a "started"
215         e := tracker.Started
216
217         for {
218                 ar := me.announce(ctx, e)
219                 // after first announce, get back to regular "none"
220                 e = tracker.None
221                 me.t.cl.lock()
222                 me.lastAnnounce = ar
223                 me.t.cl.unlock()
224
225         recalculate:
226                 // Make sure we don't announce for at least a minute since the last one.
227                 interval := ar.Interval
228                 if interval < time.Minute {
229                         interval = time.Minute
230                 }
231
232                 me.t.cl.lock()
233                 wantPeers := me.t.wantPeersEvent.C()
234                 me.t.cl.unlock()
235
236                 // If we want peers, reduce the interval to the minimum if it's appropriate.
237
238                 // A channel that receives when we should reconsider our interval. Starts as nil since that
239                 // never receives.
240                 var reconsider <-chan struct{}
241                 select {
242                 case <-wantPeers:
243                         if interval > time.Minute && me.canIgnoreInterval(&reconsider) {
244                                 interval = time.Minute
245                         }
246                 default:
247                         reconsider = wantPeers
248                 }
249
250                 select {
251                 case <-me.t.closed.Done():
252                         return
253                 case <-reconsider:
254                         // Recalculate the interval.
255                         goto recalculate
256                 case <-time.After(time.Until(ar.Completed.Add(interval))):
257                 }
258         }
259 }
260
261 func (me *trackerScraper) announceStopped() {
262         ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout)
263         defer cancel()
264         me.announce(ctx, tracker.Stopped)
265 }