]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/server.go
Add some tracing
[btrtrc.git] / tracker / server.go
1 package tracker
2
3 import (
4         "context"
5         "encoding/hex"
6         "net/netip"
7         "sync"
8         "time"
9
10         "github.com/anacrolix/generics"
11         "github.com/anacrolix/log"
12         "go.opentelemetry.io/otel"
13         "go.opentelemetry.io/otel/attribute"
14         "go.opentelemetry.io/otel/trace"
15
16         "github.com/anacrolix/torrent/tracker/udp"
17 )
18
19 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
20 // limiting return count, etc.
21 type GetPeersOpts struct {
22         // Negative numbers are not allowed.
23         MaxCount generics.Option[uint]
24 }
25
26 type InfoHash = [20]byte
27
28 type PeerInfo struct {
29         AnnounceAddr
30 }
31
32 type AnnounceAddr = netip.AddrPort
33
34 type AnnounceTracker interface {
35         TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
36         Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
37         GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
38 }
39
40 type AnnounceHandler struct {
41         AnnounceTracker        AnnounceTracker
42         UpstreamTrackers       []Client
43         UpstreamTrackerUrls    []string
44         UpstreamAnnouncePeerId [20]byte
45
46         mu sync.Mutex
47         // Operations are only removed when all the upstream peers have been tracked.
48         ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
49 }
50
51 type peerSet = map[PeerInfo]struct{}
52
53 type augmentationOperation struct {
54         // Closed when no more announce responses are pending. finalPeers will contain all the peers
55         // seen.
56         doneAnnouncing chan struct{}
57         // This receives the latest peerSet until doneAnnouncing is closed.
58         curPeers chan peerSet
59         // This contains the final peerSet after doneAnnouncing is closed.
60         finalPeers peerSet
61 }
62
63 func (me augmentationOperation) getCurPeers() (ret peerSet) {
64         ret, _ = me.getCurPeersAndDone()
65         return
66 }
67
68 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
69         select {
70         case ret = <-me.curPeers:
71         case <-me.doneAnnouncing:
72                 ret = me.finalPeers
73                 done = true
74         }
75         return
76 }
77
78 // Adds peers from new that aren't in orig. Modifies both arguments.
79 func addMissing(orig []PeerInfo, new peerSet) {
80         for _, peer := range orig {
81                 delete(new, peer)
82         }
83         for peer := range new {
84                 orig = append(orig, peer)
85         }
86 }
87
88 var tracer = otel.Tracer("torrent.tracker.udp")
89
90 func (me *AnnounceHandler) Serve(
91         ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
92 ) (peers []PeerInfo, err error) {
93         ctx, span := tracer.Start(
94                 ctx,
95                 "AnnounceHandler.Serve",
96                 trace.WithAttributes(
97                         attribute.Int64("announce.request.num_want", int64(req.NumWant)),
98                         attribute.Int("announce.request.port", int(req.Port)),
99                         attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
100                         attribute.String("announce.request.event", req.Event.String()),
101                         attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
102                         attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
103                         attribute.String("announce.source.addr.ip", addr.Addr().String()),
104                         attribute.Int("announce.source.addr.port", int(addr.Port())),
105                 ),
106         )
107         defer span.End()
108         defer func() {
109                 span.SetAttributes(attribute.Int("announce.get_peers.len", len(peers)))
110         }()
111
112         err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
113         if err != nil {
114                 return
115         }
116         infoHash := req.InfoHash
117         var op generics.Option[augmentationOperation]
118         // Grab a handle to any augmentations that are already running.
119         me.mu.Lock()
120         op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
121         me.mu.Unlock()
122         // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
123         // but it seems the most flexible.
124         if req.NumWant != -1 {
125                 newCount := uint(req.NumWant)
126                 if opts.MaxCount.Ok {
127                         if newCount < opts.MaxCount.Value {
128                                 opts.MaxCount.Value = newCount
129                         }
130                 } else {
131                         opts.MaxCount = generics.Some(newCount)
132                 }
133         }
134         peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, opts)
135         if err != nil {
136                 return
137         }
138         // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
139         // the mutex or not.
140         if op.Ok {
141                 curPeers, done := op.Value.getCurPeersAndDone()
142                 addMissing(peers, curPeers)
143                 if done {
144                         // It doesn't get any better with this operation. Forget it.
145                         op.Ok = false
146                 }
147         }
148         me.mu.Lock()
149         // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
150         // assuming the announcing peer might be that one. Really we should record a value to prevent
151         // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
152         // for none.
153         if !op.Ok && len(peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
154                 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
155                 if !op.Ok {
156                         op.Set(me.augmentPeersFromUpstream(req.InfoHash))
157                         generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
158                 }
159         }
160         me.mu.Unlock()
161         // Wait a while for the current operation.
162         if op.Ok {
163                 // Force the augmentation to return with whatever it has if it hasn't completed in a
164                 // reasonable time.
165                 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
166                 select {
167                 case <-ctx.Done():
168                 case <-op.Value.doneAnnouncing:
169                 }
170                 cancel()
171                 addMissing(peers, op.Value.getCurPeers())
172         }
173         return
174 }
175
176 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
177         announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
178         subReq := AnnounceRequest{
179                 InfoHash: infoHash,
180                 PeerId:   me.UpstreamAnnouncePeerId,
181                 Event:    None,
182                 Key:      0,
183                 NumWant:  -1,
184                 Port:     0,
185         }
186         peersChan := make(chan []Peer)
187         var pendingUpstreams sync.WaitGroup
188         for i := range me.UpstreamTrackers {
189                 client := me.UpstreamTrackers[i]
190                 url := me.UpstreamTrackerUrls[i]
191                 pendingUpstreams.Add(1)
192                 go func() {
193                         resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
194                                 UserAgent: "aragorn",
195                         })
196                         peersChan <- resp.Peers
197                         if err != nil {
198                                 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
199                         }
200                 }()
201         }
202         peersToTrack := make(map[string]Peer)
203         go func() {
204                 pendingUpstreams.Wait()
205                 cancel()
206                 close(peersChan)
207                 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
208                 for _, peer := range peersToTrack {
209                         addrPort, ok := peer.ToNetipAddrPort()
210                         if !ok {
211                                 continue
212                         }
213                         trackReq := AnnounceRequest{
214                                 InfoHash: infoHash,
215                                 Event:    Started,
216                                 Port:     uint16(peer.Port),
217                         }
218                         copy(trackReq.PeerId[:], peer.ID)
219                         err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
220                         if err != nil {
221                                 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
222                         }
223                 }
224                 me.mu.Lock()
225                 delete(me.ongoingUpstreamAugmentations, infoHash)
226                 me.mu.Unlock()
227         }()
228         curPeersChan := make(chan map[PeerInfo]struct{})
229         doneChan := make(chan struct{})
230         retPeers := make(map[PeerInfo]struct{})
231         go func() {
232                 defer close(doneChan)
233                 for {
234                         select {
235                         case peers, ok := <-peersChan:
236                                 if !ok {
237                                         return
238                                 }
239                                 voldemort(peers, peersToTrack, retPeers)
240                                 pendingUpstreams.Done()
241                         case curPeersChan <- copyPeerSet(retPeers):
242                         }
243                 }
244         }()
245         // Take return references.
246         return augmentationOperation{
247                 curPeers:       curPeersChan,
248                 finalPeers:     retPeers,
249                 doneAnnouncing: doneChan,
250         }
251 }
252
253 func copyPeerSet(orig peerSet) (ret peerSet) {
254         ret = make(peerSet, len(orig))
255         for k, v := range orig {
256                 ret[k] = v
257         }
258         return
259 }
260
261 // Adds peers to trailing containers.
262 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
263         for _, protoPeer := range peers {
264                 toTrack[protoPeer.String()] = protoPeer
265                 addr, ok := netip.AddrFromSlice(protoPeer.IP)
266                 if !ok {
267                         continue
268                 }
269                 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
270                 for _, set := range sets {
271                         set[handlerPeer] = struct{}{}
272                 }
273         }
274 }