]> Sergey Matveev's repositories - btrtrc.git/blob - tracker_scraper.go
Rework Reader waiting
[btrtrc.git] / tracker_scraper.go
1 package torrent
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "fmt"
8         "net"
9         "net/url"
10         "time"
11
12         "github.com/anacrolix/dht/v2/krpc"
13         "github.com/anacrolix/log"
14
15         "github.com/anacrolix/torrent/tracker"
16 )
17
18 // Announces a torrent to a tracker at regular intervals, when peers are
19 // required.
20 type trackerScraper struct {
21         u            url.URL
22         t            *Torrent
23         lastAnnounce trackerAnnounceResult
24 }
25
26 type torrentTrackerAnnouncer interface {
27         statusLine() string
28         URL() *url.URL
29 }
30
31 func (me trackerScraper) URL() *url.URL {
32         return &me.u
33 }
34
35 func (ts *trackerScraper) statusLine() string {
36         var w bytes.Buffer
37         fmt.Fprintf(&w, "next ann: %v, last ann: %v",
38                 func() string {
39                         na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
40                         if na > 0 {
41                                 na /= time.Second
42                                 na *= time.Second
43                                 return na.String()
44                         } else {
45                                 return "anytime"
46                         }
47                 }(),
48                 func() string {
49                         if ts.lastAnnounce.Err != nil {
50                                 return ts.lastAnnounce.Err.Error()
51                         }
52                         if ts.lastAnnounce.Completed.IsZero() {
53                                 return "never"
54                         }
55                         return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers)
56                 }(),
57         )
58         return w.String()
59 }
60
61 type trackerAnnounceResult struct {
62         Err       error
63         NumPeers  int
64         Interval  time.Duration
65         Completed time.Time
66 }
67
68 func (me *trackerScraper) getIp() (ip net.IP, err error) {
69         ips, err := net.LookupIP(me.u.Hostname())
70         if err != nil {
71                 return
72         }
73         if len(ips) == 0 {
74                 err = errors.New("no ips")
75                 return
76         }
77         for _, ip = range ips {
78                 if me.t.cl.ipIsBlocked(ip) {
79                         continue
80                 }
81                 switch me.u.Scheme {
82                 case "udp4":
83                         if ip.To4() == nil {
84                                 continue
85                         }
86                 case "udp6":
87                         if ip.To4() != nil {
88                                 continue
89                         }
90                 }
91                 return
92         }
93         err = errors.New("no acceptable ips")
94         return
95 }
96
97 func (me *trackerScraper) trackerUrl(ip net.IP) string {
98         u := me.u
99         if u.Port() != "" {
100                 u.Host = net.JoinHostPort(ip.String(), u.Port())
101         }
102         return u.String()
103 }
104
105 // Return how long to wait before trying again. For most errors, we return 5
106 // minutes, a relatively quick turn around for DNS changes.
107 func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) {
108
109         defer func() {
110                 ret.Completed = time.Now()
111         }()
112         ret.Interval = time.Minute
113
114         // Limit concurrent use of the same tracker URL by the Client.
115         ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
116         defer ref.Drop()
117         select {
118         case <-ctx.Done():
119                 ret.Err = ctx.Err()
120                 return
121         case ref.C() <- struct{}{}:
122         }
123         defer func() {
124                 select {
125                 case <-ref.C():
126                 default:
127                         panic("should return immediately")
128                 }
129         }()
130
131         ip, err := me.getIp()
132         if err != nil {
133                 ret.Err = fmt.Errorf("error getting ip: %s", err)
134                 return
135         }
136         me.t.cl.rLock()
137         req := me.t.announceRequest(event)
138         me.t.cl.rUnlock()
139         // The default timeout works well as backpressure on concurrent access to the tracker. Since
140         // we're passing our own Context now, we will include that timeout ourselves to maintain similar
141         // behavior to previously, albeit with this context now being cancelled when the Torrent is
142         // closed.
143         ctx, cancel := context.WithTimeout(ctx, tracker.DefaultTrackerAnnounceTimeout)
144         defer cancel()
145         me.t.logger.WithDefaultLevel(log.Debug).Printf("announcing to %q: %#v", me.u.String(), req)
146         res, err := tracker.Announce{
147                 Context:    ctx,
148                 HTTPProxy:  me.t.cl.config.HTTPProxy,
149                 UserAgent:  me.t.cl.config.HTTPUserAgent,
150                 TrackerUrl: me.trackerUrl(ip),
151                 Request:    req,
152                 HostHeader: me.u.Host,
153                 ServerName: me.u.Hostname(),
154                 UdpNetwork: me.u.Scheme,
155                 ClientIp4:  krpc.NodeAddr{IP: me.t.cl.config.PublicIp4},
156                 ClientIp6:  krpc.NodeAddr{IP: me.t.cl.config.PublicIp6},
157         }.Do()
158         me.t.logger.WithDefaultLevel(log.Debug).Printf("announce to %q returned %#v: %v", me.u.String(), res, err)
159         if err != nil {
160                 ret.Err = fmt.Errorf("announcing: %w", err)
161                 return
162         }
163         me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
164         ret.NumPeers = len(res.Peers)
165         ret.Interval = time.Duration(res.Interval) * time.Second
166         return
167 }
168
169 // Returns whether we can shorten the interval, and sets notify to a channel that receives when we
170 // might change our mind, or leaves it if we won't.
171 func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
172         gotInfo := me.t.GotInfo()
173         select {
174         case <-gotInfo:
175                 // Private trackers really don't like us announcing more than they specify. They're also
176                 // tracking us very carefully, so it's best to comply.
177                 private := me.t.info.Private
178                 return private == nil || !*private
179         default:
180                 *notify = gotInfo
181                 return false
182         }
183 }
184
185 func (me *trackerScraper) Run() {
186
187         defer me.announceStopped()
188
189         ctx, cancel := context.WithCancel(context.Background())
190         defer cancel()
191         go func() {
192                 defer cancel()
193                 select {
194                 case <-ctx.Done():
195                 case <-me.t.Closed():
196                 }
197         }()
198
199         // make sure first announce is a "started"
200         e := tracker.Started
201
202         for {
203                 ar := me.announce(ctx, e)
204                 // after first announce, get back to regular "none"
205                 e = tracker.None
206                 me.t.cl.lock()
207                 me.lastAnnounce = ar
208                 me.t.cl.unlock()
209
210         recalculate:
211                 // Make sure we don't announce for at least a minute since the last one.
212                 interval := ar.Interval
213                 if interval < time.Minute {
214                         interval = time.Minute
215                 }
216
217                 me.t.cl.lock()
218                 wantPeers := me.t.wantPeersEvent.C()
219                 me.t.cl.unlock()
220
221                 // If we want peers, reduce the interval to the minimum if it's appropriate.
222
223                 // A channel that receives when we should reconsider our interval. Starts as nil since that
224                 // never receives.
225                 var reconsider <-chan struct{}
226                 select {
227                 case <-wantPeers:
228                         if interval > time.Minute && me.canIgnoreInterval(&reconsider) {
229                                 interval = time.Minute
230                         }
231                 default:
232                         reconsider = wantPeers
233                 }
234
235                 select {
236                 case <-me.t.closed.Done():
237                         return
238                 case <-reconsider:
239                         // Recalculate the interval.
240                         goto recalculate
241                 case <-time.After(time.Until(ar.Completed.Add(interval))):
242                 }
243         }
244 }
245
246 func (me *trackerScraper) announceStopped() {
247         ctx, cancel := context.WithTimeout(context.Background(), tracker.DefaultTrackerAnnounceTimeout)
248         defer cancel()
249         me.announce(ctx, tracker.Stopped)
250 }