]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/server.go
Default to announcing as leecher
[btrtrc.git] / tracker / server.go
1 package tracker
2
3 import (
4         "context"
5         "encoding/hex"
6         "fmt"
7         "net/netip"
8         "sync"
9         "time"
10
11         "github.com/anacrolix/generics"
12         "github.com/anacrolix/log"
13         "go.opentelemetry.io/otel"
14         "go.opentelemetry.io/otel/attribute"
15         "go.opentelemetry.io/otel/codes"
16         "go.opentelemetry.io/otel/trace"
17
18         "github.com/anacrolix/torrent/tracker/udp"
19 )
20
21 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
22 // limiting return count, etc.
23 type GetPeersOpts struct {
24         // Negative numbers are not allowed.
25         MaxCount generics.Option[uint]
26 }
27
28 type InfoHash = [20]byte
29
30 type PeerInfo struct {
31         AnnounceAddr
32 }
33
34 type AnnounceAddr = netip.AddrPort
35
36 type AnnounceTracker interface {
37         TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
38         Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
39         GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ServerAnnounceResult
40 }
41
42 type ServerAnnounceResult struct {
43         Err      error
44         Peers    []PeerInfo
45         Interval generics.Option[int32]
46         Leechers generics.Option[int32]
47         Seeders  generics.Option[int32]
48 }
49
50 type AnnounceHandler struct {
51         AnnounceTracker AnnounceTracker
52
53         UpstreamTrackers       []Client
54         UpstreamTrackerUrls    []string
55         UpstreamAnnouncePeerId [20]byte
56         UpstreamAnnounceGate   UpstreamAnnounceGater
57
58         mu sync.Mutex
59         // Operations are only removed when all the upstream peers have been tracked.
60         ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
61 }
62
63 type peerSet = map[PeerInfo]struct{}
64
65 type augmentationOperation struct {
66         // Closed when no more announce responses are pending. finalPeers will contain all the peers
67         // seen.
68         doneAnnouncing chan struct{}
69         // This receives the latest peerSet until doneAnnouncing is closed.
70         curPeers chan peerSet
71         // This contains the final peerSet after doneAnnouncing is closed.
72         finalPeers peerSet
73 }
74
75 func (me augmentationOperation) getCurPeers() (ret peerSet) {
76         ret, _ = me.getCurPeersAndDone()
77         return
78 }
79
80 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
81         select {
82         case ret = <-me.curPeers:
83         case <-me.doneAnnouncing:
84                 ret = me.finalPeers
85                 done = true
86         }
87         return
88 }
89
90 // Adds peers from new that aren't in orig. Modifies both arguments.
91 func addMissing(orig []PeerInfo, new peerSet) {
92         for _, peer := range orig {
93                 delete(new, peer)
94         }
95         for peer := range new {
96                 orig = append(orig, peer)
97         }
98 }
99
100 var tracer = otel.Tracer("torrent.tracker.udp")
101
102 func (me *AnnounceHandler) Serve(
103         ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
104 ) (ret ServerAnnounceResult) {
105         ctx, span := tracer.Start(
106                 ctx,
107                 "AnnounceHandler.Serve",
108                 trace.WithAttributes(
109                         attribute.Int64("announce.request.num_want", int64(req.NumWant)),
110                         attribute.Int("announce.request.port", int(req.Port)),
111                         attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
112                         attribute.String("announce.request.event", req.Event.String()),
113                         attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
114                         attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
115                         attribute.String("announce.source.addr.ip", addr.Addr().String()),
116                         attribute.Int("announce.source.addr.port", int(addr.Port())),
117                 ),
118         )
119         defer span.End()
120         defer func() {
121                 span.SetAttributes(attribute.Int("announce.get_peers.len", len(ret.Peers)))
122                 if ret.Err != nil {
123                         span.SetStatus(codes.Error, ret.Err.Error())
124                 }
125         }()
126
127         if req.Port != 0 {
128                 addr = netip.AddrPortFrom(addr.Addr(), req.Port)
129         }
130         ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
131         if ret.Err != nil {
132                 ret.Err = fmt.Errorf("tracking announce: %w", ret.Err)
133                 return
134         }
135         infoHash := req.InfoHash
136         var op generics.Option[augmentationOperation]
137         // Grab a handle to any augmentations that are already running.
138         me.mu.Lock()
139         op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
140         me.mu.Unlock()
141         // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
142         // but it seems the most flexible.
143         if req.NumWant != -1 {
144                 newCount := uint(req.NumWant)
145                 if opts.MaxCount.Ok {
146                         if newCount < opts.MaxCount.Value {
147                                 opts.MaxCount.Value = newCount
148                         }
149                 } else {
150                         opts.MaxCount = generics.Some(newCount)
151                 }
152         }
153         ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts)
154         if ret.Err != nil {
155                 return
156         }
157         // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
158         // the mutex or not.
159         if op.Ok {
160                 curPeers, done := op.Value.getCurPeersAndDone()
161                 addMissing(ret.Peers, curPeers)
162                 if done {
163                         // It doesn't get any better with this operation. Forget it.
164                         op.Ok = false
165                 }
166         }
167         me.mu.Lock()
168         // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
169         // assuming the announcing peer might be that one. Really we should record a value to prevent
170         // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
171         // for none.
172         if !op.Ok && len(ret.Peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
173                 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
174                 if !op.Ok {
175                         op.Set(me.augmentPeersFromUpstream(req.InfoHash))
176                         generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
177                 }
178         }
179         me.mu.Unlock()
180         // Wait a while for the current operation.
181         if op.Ok {
182                 // Force the augmentation to return with whatever it has if it hasn't completed in a
183                 // reasonable time.
184                 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
185                 select {
186                 case <-ctx.Done():
187                 case <-op.Value.doneAnnouncing:
188                 }
189                 cancel()
190                 addMissing(ret.Peers, op.Value.getCurPeers())
191         }
192         return
193 }
194
195 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
196         const announceTimeout = time.Minute
197         announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
198         subReq := AnnounceRequest{
199                 InfoHash: infoHash,
200                 PeerId:   me.UpstreamAnnouncePeerId,
201                 Event:    None,
202                 Key:      0,
203                 NumWant:  -1,
204                 Port:     0,
205         }
206         peersChan := make(chan []Peer)
207         var pendingUpstreams sync.WaitGroup
208         for i := range me.UpstreamTrackers {
209                 client := me.UpstreamTrackers[i]
210                 url := me.UpstreamTrackerUrls[i]
211                 pendingUpstreams.Add(1)
212                 go func() {
213                         started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
214                         if err != nil {
215                                 log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
216                         }
217                         if err != nil || !started {
218                                 peersChan <- nil
219                                 return
220                         }
221                         log.Printf("announcing %x upstream to %v", infoHash, url)
222                         resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
223                                 UserAgent: "aragorn",
224                         })
225                         interval := resp.Interval
226                         go func() {
227                                 if interval < 5*60 {
228                                         // This is as much to reduce load on upstream trackers in the event of errors,
229                                         // as it is to reduce load on our peer store.
230                                         interval = 5 * 60
231                                 }
232                                 err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
233                                 if err != nil {
234                                         log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
235                                 }
236                         }()
237                         peersChan <- resp.Peers
238                         if err != nil {
239                                 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
240                         }
241                 }()
242         }
243         peersToTrack := make(map[string]Peer)
244         go func() {
245                 pendingUpstreams.Wait()
246                 cancel()
247                 close(peersChan)
248                 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
249                 for _, peer := range peersToTrack {
250                         addrPort, ok := peer.ToNetipAddrPort()
251                         if !ok {
252                                 continue
253                         }
254                         trackReq := AnnounceRequest{
255                                 InfoHash: infoHash,
256                                 Event:    Started,
257                                 Port:     uint16(peer.Port),
258                         }
259                         copy(trackReq.PeerId[:], peer.ID)
260                         err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
261                         if err != nil {
262                                 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
263                         }
264                 }
265                 me.mu.Lock()
266                 delete(me.ongoingUpstreamAugmentations, infoHash)
267                 me.mu.Unlock()
268         }()
269         curPeersChan := make(chan map[PeerInfo]struct{})
270         doneChan := make(chan struct{})
271         retPeers := make(map[PeerInfo]struct{})
272         go func() {
273                 defer close(doneChan)
274                 for {
275                         select {
276                         case peers, ok := <-peersChan:
277                                 if !ok {
278                                         return
279                                 }
280                                 voldemort(peers, peersToTrack, retPeers)
281                                 pendingUpstreams.Done()
282                         case curPeersChan <- copyPeerSet(retPeers):
283                         }
284                 }
285         }()
286         // Take return references.
287         return augmentationOperation{
288                 curPeers:       curPeersChan,
289                 finalPeers:     retPeers,
290                 doneAnnouncing: doneChan,
291         }
292 }
293
294 func copyPeerSet(orig peerSet) (ret peerSet) {
295         ret = make(peerSet, len(orig))
296         for k, v := range orig {
297                 ret[k] = v
298         }
299         return
300 }
301
302 // Adds peers to trailing containers.
303 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
304         for _, protoPeer := range peers {
305                 toTrack[protoPeer.String()] = protoPeer
306                 addr, ok := netip.AddrFromSlice(protoPeer.IP)
307                 if !ok {
308                         continue
309                 }
310                 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
311                 for _, set := range sets {
312                         set[handlerPeer] = struct{}{}
313                 }
314         }
315 }