]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/server.go
Add upstream announce gating
[btrtrc.git] / tracker / server.go
1 package tracker
2
3 import (
4         "context"
5         "encoding/hex"
6         "net/netip"
7         "sync"
8         "time"
9
10         "github.com/anacrolix/generics"
11         "github.com/anacrolix/log"
12         "go.opentelemetry.io/otel"
13         "go.opentelemetry.io/otel/attribute"
14         "go.opentelemetry.io/otel/trace"
15
16         "github.com/anacrolix/torrent/tracker/udp"
17 )
18
19 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
20 // limiting return count, etc.
21 type GetPeersOpts struct {
22         // Negative numbers are not allowed.
23         MaxCount generics.Option[uint]
24 }
25
26 type InfoHash = [20]byte
27
28 type PeerInfo struct {
29         AnnounceAddr
30 }
31
32 type AnnounceAddr = netip.AddrPort
33
34 type AnnounceTracker interface {
35         TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
36         Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
37         GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
38 }
39
40 type AnnounceHandler struct {
41         AnnounceTracker AnnounceTracker
42
43         UpstreamTrackers       []Client
44         UpstreamTrackerUrls    []string
45         UpstreamAnnouncePeerId [20]byte
46         UpstreamAnnounceGate   UpstreamAnnounceGater
47
48         mu sync.Mutex
49         // Operations are only removed when all the upstream peers have been tracked.
50         ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
51 }
52
53 type peerSet = map[PeerInfo]struct{}
54
55 type augmentationOperation struct {
56         // Closed when no more announce responses are pending. finalPeers will contain all the peers
57         // seen.
58         doneAnnouncing chan struct{}
59         // This receives the latest peerSet until doneAnnouncing is closed.
60         curPeers chan peerSet
61         // This contains the final peerSet after doneAnnouncing is closed.
62         finalPeers peerSet
63 }
64
65 func (me augmentationOperation) getCurPeers() (ret peerSet) {
66         ret, _ = me.getCurPeersAndDone()
67         return
68 }
69
70 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
71         select {
72         case ret = <-me.curPeers:
73         case <-me.doneAnnouncing:
74                 ret = me.finalPeers
75                 done = true
76         }
77         return
78 }
79
80 // Adds peers from new that aren't in orig. Modifies both arguments.
81 func addMissing(orig []PeerInfo, new peerSet) {
82         for _, peer := range orig {
83                 delete(new, peer)
84         }
85         for peer := range new {
86                 orig = append(orig, peer)
87         }
88 }
89
90 var tracer = otel.Tracer("torrent.tracker.udp")
91
92 func (me *AnnounceHandler) Serve(
93         ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
94 ) (peers []PeerInfo, err error) {
95         ctx, span := tracer.Start(
96                 ctx,
97                 "AnnounceHandler.Serve",
98                 trace.WithAttributes(
99                         attribute.Int64("announce.request.num_want", int64(req.NumWant)),
100                         attribute.Int("announce.request.port", int(req.Port)),
101                         attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
102                         attribute.String("announce.request.event", req.Event.String()),
103                         attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
104                         attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
105                         attribute.String("announce.source.addr.ip", addr.Addr().String()),
106                         attribute.Int("announce.source.addr.port", int(addr.Port())),
107                 ),
108         )
109         defer span.End()
110         defer func() {
111                 span.SetAttributes(attribute.Int("announce.get_peers.len", len(peers)))
112         }()
113
114         err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
115         if err != nil {
116                 return
117         }
118         infoHash := req.InfoHash
119         var op generics.Option[augmentationOperation]
120         // Grab a handle to any augmentations that are already running.
121         me.mu.Lock()
122         op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
123         me.mu.Unlock()
124         // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
125         // but it seems the most flexible.
126         if req.NumWant != -1 {
127                 newCount := uint(req.NumWant)
128                 if opts.MaxCount.Ok {
129                         if newCount < opts.MaxCount.Value {
130                                 opts.MaxCount.Value = newCount
131                         }
132                 } else {
133                         opts.MaxCount = generics.Some(newCount)
134                 }
135         }
136         peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, opts)
137         if err != nil {
138                 return
139         }
140         // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
141         // the mutex or not.
142         if op.Ok {
143                 curPeers, done := op.Value.getCurPeersAndDone()
144                 addMissing(peers, curPeers)
145                 if done {
146                         // It doesn't get any better with this operation. Forget it.
147                         op.Ok = false
148                 }
149         }
150         me.mu.Lock()
151         // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
152         // assuming the announcing peer might be that one. Really we should record a value to prevent
153         // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
154         // for none.
155         if !op.Ok && len(peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
156                 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
157                 if !op.Ok {
158                         op.Set(me.augmentPeersFromUpstream(req.InfoHash))
159                         generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
160                 }
161         }
162         me.mu.Unlock()
163         // Wait a while for the current operation.
164         if op.Ok {
165                 // Force the augmentation to return with whatever it has if it hasn't completed in a
166                 // reasonable time.
167                 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
168                 select {
169                 case <-ctx.Done():
170                 case <-op.Value.doneAnnouncing:
171                 }
172                 cancel()
173                 addMissing(peers, op.Value.getCurPeers())
174         }
175         return
176 }
177
178 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
179         const announceTimeout = time.Minute
180         announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
181         subReq := AnnounceRequest{
182                 InfoHash: infoHash,
183                 PeerId:   me.UpstreamAnnouncePeerId,
184                 Event:    None,
185                 Key:      0,
186                 NumWant:  -1,
187                 Port:     0,
188         }
189         peersChan := make(chan []Peer)
190         var pendingUpstreams sync.WaitGroup
191         for i := range me.UpstreamTrackers {
192                 client := me.UpstreamTrackers[i]
193                 url := me.UpstreamTrackerUrls[i]
194                 pendingUpstreams.Add(1)
195                 go func() {
196                         started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
197                         if err != nil {
198                                 log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
199                         }
200                         if err != nil || !started {
201                                 peersChan <- nil
202                                 return
203                         }
204                         log.Printf("announcing %x upstream to %v", infoHash, url)
205                         resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
206                                 UserAgent: "aragorn",
207                         })
208                         interval := resp.Interval
209                         go func() {
210                                 if interval < 5*60 {
211                                         // This is as much to reduce load on upstream trackers in the event of errors,
212                                         // as it is to reduce load on our peer store.
213                                         interval = 5 * 60
214                                 }
215                                 err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
216                                 if err != nil {
217                                         log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
218                                 }
219                         }()
220                         peersChan <- resp.Peers
221                         if err != nil {
222                                 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
223                         }
224                 }()
225         }
226         peersToTrack := make(map[string]Peer)
227         go func() {
228                 pendingUpstreams.Wait()
229                 cancel()
230                 close(peersChan)
231                 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
232                 for _, peer := range peersToTrack {
233                         addrPort, ok := peer.ToNetipAddrPort()
234                         if !ok {
235                                 continue
236                         }
237                         trackReq := AnnounceRequest{
238                                 InfoHash: infoHash,
239                                 Event:    Started,
240                                 Port:     uint16(peer.Port),
241                         }
242                         copy(trackReq.PeerId[:], peer.ID)
243                         err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
244                         if err != nil {
245                                 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
246                         }
247                 }
248                 me.mu.Lock()
249                 delete(me.ongoingUpstreamAugmentations, infoHash)
250                 me.mu.Unlock()
251         }()
252         curPeersChan := make(chan map[PeerInfo]struct{})
253         doneChan := make(chan struct{})
254         retPeers := make(map[PeerInfo]struct{})
255         go func() {
256                 defer close(doneChan)
257                 for {
258                         select {
259                         case peers, ok := <-peersChan:
260                                 if !ok {
261                                         return
262                                 }
263                                 voldemort(peers, peersToTrack, retPeers)
264                                 pendingUpstreams.Done()
265                         case curPeersChan <- copyPeerSet(retPeers):
266                         }
267                 }
268         }()
269         // Take return references.
270         return augmentationOperation{
271                 curPeers:       curPeersChan,
272                 finalPeers:     retPeers,
273                 doneAnnouncing: doneChan,
274         }
275 }
276
277 func copyPeerSet(orig peerSet) (ret peerSet) {
278         ret = make(peerSet, len(orig))
279         for k, v := range orig {
280                 ret[k] = v
281         }
282         return
283 }
284
285 // Adds peers to trailing containers.
286 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
287         for _, protoPeer := range peers {
288                 toTrack[protoPeer.String()] = protoPeer
289                 addr, ok := netip.AddrFromSlice(protoPeer.IP)
290                 if !ok {
291                         continue
292                 }
293                 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
294                 for _, set := range sets {
295                         set[handlerPeer] = struct{}{}
296                 }
297         }
298 }