]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/server.go
Propagate announce interval, particularly for UDP
[btrtrc.git] / tracker / server.go
1 package tracker
2
3 import (
4         "context"
5         "encoding/hex"
6         "net/netip"
7         "sync"
8         "time"
9
10         "github.com/anacrolix/generics"
11         "github.com/anacrolix/log"
12         "go.opentelemetry.io/otel"
13         "go.opentelemetry.io/otel/attribute"
14         "go.opentelemetry.io/otel/trace"
15
16         "github.com/anacrolix/torrent/tracker/udp"
17 )
18
19 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
20 // limiting return count, etc.
21 type GetPeersOpts struct {
22         // Negative numbers are not allowed.
23         MaxCount generics.Option[uint]
24 }
25
26 type InfoHash = [20]byte
27
28 type PeerInfo struct {
29         AnnounceAddr
30 }
31
32 type AnnounceAddr = netip.AddrPort
33
34 type AnnounceTracker interface {
35         TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
36         Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
37         GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ServerAnnounceResult
38 }
39
40 type ServerAnnounceResult struct {
41         Err      error
42         Peers    []PeerInfo
43         Interval generics.Option[int32]
44         Leechers generics.Option[int32]
45         Seeders  generics.Option[int32]
46 }
47
48 type AnnounceHandler struct {
49         AnnounceTracker AnnounceTracker
50
51         UpstreamTrackers       []Client
52         UpstreamTrackerUrls    []string
53         UpstreamAnnouncePeerId [20]byte
54         UpstreamAnnounceGate   UpstreamAnnounceGater
55
56         mu sync.Mutex
57         // Operations are only removed when all the upstream peers have been tracked.
58         ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
59 }
60
61 type peerSet = map[PeerInfo]struct{}
62
63 type augmentationOperation struct {
64         // Closed when no more announce responses are pending. finalPeers will contain all the peers
65         // seen.
66         doneAnnouncing chan struct{}
67         // This receives the latest peerSet until doneAnnouncing is closed.
68         curPeers chan peerSet
69         // This contains the final peerSet after doneAnnouncing is closed.
70         finalPeers peerSet
71 }
72
73 func (me augmentationOperation) getCurPeers() (ret peerSet) {
74         ret, _ = me.getCurPeersAndDone()
75         return
76 }
77
78 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
79         select {
80         case ret = <-me.curPeers:
81         case <-me.doneAnnouncing:
82                 ret = me.finalPeers
83                 done = true
84         }
85         return
86 }
87
88 // Adds peers from new that aren't in orig. Modifies both arguments.
89 func addMissing(orig []PeerInfo, new peerSet) {
90         for _, peer := range orig {
91                 delete(new, peer)
92         }
93         for peer := range new {
94                 orig = append(orig, peer)
95         }
96 }
97
98 var tracer = otel.Tracer("torrent.tracker.udp")
99
100 func (me *AnnounceHandler) Serve(
101         ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
102 ) (ret ServerAnnounceResult) {
103         ctx, span := tracer.Start(
104                 ctx,
105                 "AnnounceHandler.Serve",
106                 trace.WithAttributes(
107                         attribute.Int64("announce.request.num_want", int64(req.NumWant)),
108                         attribute.Int("announce.request.port", int(req.Port)),
109                         attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
110                         attribute.String("announce.request.event", req.Event.String()),
111                         attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
112                         attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
113                         attribute.String("announce.source.addr.ip", addr.Addr().String()),
114                         attribute.Int("announce.source.addr.port", int(addr.Port())),
115                 ),
116         )
117         defer span.End()
118         defer func() {
119                 span.SetAttributes(attribute.Int("announce.get_peers.len", len(ret.Peers)))
120         }()
121
122         ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
123         if ret.Err != nil {
124                 return
125         }
126         infoHash := req.InfoHash
127         var op generics.Option[augmentationOperation]
128         // Grab a handle to any augmentations that are already running.
129         me.mu.Lock()
130         op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
131         me.mu.Unlock()
132         // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
133         // but it seems the most flexible.
134         if req.NumWant != -1 {
135                 newCount := uint(req.NumWant)
136                 if opts.MaxCount.Ok {
137                         if newCount < opts.MaxCount.Value {
138                                 opts.MaxCount.Value = newCount
139                         }
140                 } else {
141                         opts.MaxCount = generics.Some(newCount)
142                 }
143         }
144         ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts)
145         if ret.Err != nil {
146                 return
147         }
148         // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
149         // the mutex or not.
150         if op.Ok {
151                 curPeers, done := op.Value.getCurPeersAndDone()
152                 addMissing(ret.Peers, curPeers)
153                 if done {
154                         // It doesn't get any better with this operation. Forget it.
155                         op.Ok = false
156                 }
157         }
158         me.mu.Lock()
159         // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
160         // assuming the announcing peer might be that one. Really we should record a value to prevent
161         // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
162         // for none.
163         if !op.Ok && len(ret.Peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
164                 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
165                 if !op.Ok {
166                         op.Set(me.augmentPeersFromUpstream(req.InfoHash))
167                         generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
168                 }
169         }
170         me.mu.Unlock()
171         // Wait a while for the current operation.
172         if op.Ok {
173                 // Force the augmentation to return with whatever it has if it hasn't completed in a
174                 // reasonable time.
175                 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
176                 select {
177                 case <-ctx.Done():
178                 case <-op.Value.doneAnnouncing:
179                 }
180                 cancel()
181                 addMissing(ret.Peers, op.Value.getCurPeers())
182         }
183         return
184 }
185
186 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
187         const announceTimeout = time.Minute
188         announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
189         subReq := AnnounceRequest{
190                 InfoHash: infoHash,
191                 PeerId:   me.UpstreamAnnouncePeerId,
192                 Event:    None,
193                 Key:      0,
194                 NumWant:  -1,
195                 Port:     0,
196         }
197         peersChan := make(chan []Peer)
198         var pendingUpstreams sync.WaitGroup
199         for i := range me.UpstreamTrackers {
200                 client := me.UpstreamTrackers[i]
201                 url := me.UpstreamTrackerUrls[i]
202                 pendingUpstreams.Add(1)
203                 go func() {
204                         started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
205                         if err != nil {
206                                 log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
207                         }
208                         if err != nil || !started {
209                                 peersChan <- nil
210                                 return
211                         }
212                         log.Printf("announcing %x upstream to %v", infoHash, url)
213                         resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
214                                 UserAgent: "aragorn",
215                         })
216                         interval := resp.Interval
217                         go func() {
218                                 if interval < 5*60 {
219                                         // This is as much to reduce load on upstream trackers in the event of errors,
220                                         // as it is to reduce load on our peer store.
221                                         interval = 5 * 60
222                                 }
223                                 err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
224                                 if err != nil {
225                                         log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
226                                 }
227                         }()
228                         peersChan <- resp.Peers
229                         if err != nil {
230                                 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
231                         }
232                 }()
233         }
234         peersToTrack := make(map[string]Peer)
235         go func() {
236                 pendingUpstreams.Wait()
237                 cancel()
238                 close(peersChan)
239                 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
240                 for _, peer := range peersToTrack {
241                         addrPort, ok := peer.ToNetipAddrPort()
242                         if !ok {
243                                 continue
244                         }
245                         trackReq := AnnounceRequest{
246                                 InfoHash: infoHash,
247                                 Event:    Started,
248                                 Port:     uint16(peer.Port),
249                         }
250                         copy(trackReq.PeerId[:], peer.ID)
251                         err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
252                         if err != nil {
253                                 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
254                         }
255                 }
256                 me.mu.Lock()
257                 delete(me.ongoingUpstreamAugmentations, infoHash)
258                 me.mu.Unlock()
259         }()
260         curPeersChan := make(chan map[PeerInfo]struct{})
261         doneChan := make(chan struct{})
262         retPeers := make(map[PeerInfo]struct{})
263         go func() {
264                 defer close(doneChan)
265                 for {
266                         select {
267                         case peers, ok := <-peersChan:
268                                 if !ok {
269                                         return
270                                 }
271                                 voldemort(peers, peersToTrack, retPeers)
272                                 pendingUpstreams.Done()
273                         case curPeersChan <- copyPeerSet(retPeers):
274                         }
275                 }
276         }()
277         // Take return references.
278         return augmentationOperation{
279                 curPeers:       curPeersChan,
280                 finalPeers:     retPeers,
281                 doneAnnouncing: doneChan,
282         }
283 }
284
285 func copyPeerSet(orig peerSet) (ret peerSet) {
286         ret = make(peerSet, len(orig))
287         for k, v := range orig {
288                 ret[k] = v
289         }
290         return
291 }
292
293 // Adds peers to trailing containers.
294 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
295         for _, protoPeer := range peers {
296                 toTrack[protoPeer.String()] = protoPeer
297                 addr, ok := netip.AddrFromSlice(protoPeer.IP)
298                 if !ok {
299                         continue
300                 }
301                 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
302                 for _, set := range sets {
303                         set[handlerPeer] = struct{}{}
304                 }
305         }
306 }