]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/server/server.go
ced31616b7f789dbff52eb70649dae0596f1aed3
[btrtrc.git] / tracker / server / server.go
1 package trackerServer
2
3 import (
4         "context"
5         "encoding/hex"
6         "fmt"
7         "net/netip"
8         "sync"
9         "time"
10
11         "github.com/anacrolix/generics"
12         "github.com/anacrolix/log"
13         "github.com/anacrolix/torrent/tracker"
14         "go.opentelemetry.io/otel"
15         "go.opentelemetry.io/otel/attribute"
16         "go.opentelemetry.io/otel/codes"
17         "go.opentelemetry.io/otel/trace"
18
19         "github.com/anacrolix/torrent/tracker/udp"
20 )
21
22 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
23 // limiting return count, etc.
24 type GetPeersOpts struct {
25         // Negative numbers are not allowed.
26         MaxCount generics.Option[uint]
27 }
28
29 type InfoHash = [20]byte
30
31 type PeerInfo struct {
32         AnnounceAddr
33 }
34
35 type AnnounceAddr = netip.AddrPort
36
37 type AnnounceTracker interface {
38         TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
39         Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
40         GetPeers(
41                 ctx context.Context,
42                 infoHash InfoHash,
43                 opts GetPeersOpts,
44                 remote AnnounceAddr,
45         ) ServerAnnounceResult
46 }
47
48 type ServerAnnounceResult struct {
49         Err      error
50         Peers    []PeerInfo
51         Interval generics.Option[int32]
52         Leechers generics.Option[int32]
53         Seeders  generics.Option[int32]
54 }
55
56 type AnnounceHandler struct {
57         AnnounceTracker AnnounceTracker
58
59         UpstreamTrackers       []Client
60         UpstreamTrackerUrls    []string
61         UpstreamAnnouncePeerId [20]byte
62         UpstreamAnnounceGate   UpstreamAnnounceGater
63
64         mu sync.Mutex
65         // Operations are only removed when all the upstream peers have been tracked.
66         ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
67 }
68
69 type peerSet = map[PeerInfo]struct{}
70
71 type augmentationOperation struct {
72         // Closed when no more announce responses are pending. finalPeers will contain all the peers
73         // seen.
74         doneAnnouncing chan struct{}
75         // This receives the latest peerSet until doneAnnouncing is closed.
76         curPeers chan peerSet
77         // This contains the final peerSet after doneAnnouncing is closed.
78         finalPeers peerSet
79 }
80
81 func (me augmentationOperation) getCurPeers() (ret peerSet) {
82         ret, _ = me.getCurPeersAndDone()
83         return
84 }
85
86 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
87         select {
88         case ret = <-me.curPeers:
89         case <-me.doneAnnouncing:
90                 ret = me.finalPeers
91                 done = true
92         }
93         return
94 }
95
96 // Adds peers from new that aren't in orig. Modifies both arguments.
97 func addMissing(orig []PeerInfo, new peerSet) {
98         for _, peer := range orig {
99                 delete(new, peer)
100         }
101         for peer := range new {
102                 orig = append(orig, peer)
103         }
104 }
105
106 var tracer = otel.Tracer("torrent.tracker.udp")
107
108 func (me *AnnounceHandler) Serve(
109         ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
110 ) (ret ServerAnnounceResult) {
111         ctx, span := tracer.Start(
112                 ctx,
113                 "AnnounceHandler.Serve",
114                 trace.WithAttributes(
115                         attribute.Int64("announce.request.num_want", int64(req.NumWant)),
116                         attribute.Int("announce.request.port", int(req.Port)),
117                         attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
118                         attribute.String("announce.request.event", req.Event.String()),
119                         attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
120                         attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
121                         attribute.String("announce.source.addr.ip", addr.Addr().String()),
122                         attribute.Int("announce.source.addr.port", int(addr.Port())),
123                 ),
124         )
125         defer span.End()
126         defer func() {
127                 span.SetAttributes(attribute.Int("announce.get_peers.len", len(ret.Peers)))
128                 if ret.Err != nil {
129                         span.SetStatus(codes.Error, ret.Err.Error())
130                 }
131         }()
132
133         if req.Port != 0 {
134                 addr = netip.AddrPortFrom(addr.Addr(), req.Port)
135         }
136         ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
137         if ret.Err != nil {
138                 ret.Err = fmt.Errorf("tracking announce: %w", ret.Err)
139                 return
140         }
141         infoHash := req.InfoHash
142         var op generics.Option[augmentationOperation]
143         // Grab a handle to any augmentations that are already running.
144         me.mu.Lock()
145         op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
146         me.mu.Unlock()
147         // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
148         // but it seems the most flexible.
149         if req.NumWant != -1 {
150                 newCount := uint(req.NumWant)
151                 if opts.MaxCount.Ok {
152                         if newCount < opts.MaxCount.Value {
153                                 opts.MaxCount.Value = newCount
154                         }
155                 } else {
156                         opts.MaxCount = generics.Some(newCount)
157                 }
158         }
159         ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts, addr)
160         if ret.Err != nil {
161                 return
162         }
163         // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
164         // the mutex or not.
165         if op.Ok {
166                 curPeers, done := op.Value.getCurPeersAndDone()
167                 addMissing(ret.Peers, curPeers)
168                 if done {
169                         // It doesn't get any better with this operation. Forget it.
170                         op.Ok = false
171                 }
172         }
173         me.mu.Lock()
174         // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
175         // assuming the announcing peer might be that one. Really we should record a value to prevent
176         // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
177         // for none.
178         if !op.Ok && len(ret.Peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
179                 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
180                 if !op.Ok {
181                         op.Set(me.augmentPeersFromUpstream(req.InfoHash))
182                         generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
183                 }
184         }
185         me.mu.Unlock()
186         // Wait a while for the current operation.
187         if op.Ok {
188                 // Force the augmentation to return with whatever it has if it hasn't completed in a
189                 // reasonable time.
190                 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
191                 select {
192                 case <-ctx.Done():
193                 case <-op.Value.doneAnnouncing:
194                 }
195                 cancel()
196                 addMissing(ret.Peers, op.Value.getCurPeers())
197         }
198         return
199 }
200
201 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
202         const announceTimeout = time.Minute
203         announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
204         subReq := AnnounceRequest{
205                 InfoHash: infoHash,
206                 PeerId:   me.UpstreamAnnouncePeerId,
207                 Event:    tracker.None,
208                 Key:      0,
209                 NumWant:  -1,
210                 Port:     0,
211         }
212         peersChan := make(chan []Peer)
213         var pendingUpstreams sync.WaitGroup
214         for i := range me.UpstreamTrackers {
215                 client := me.UpstreamTrackers[i]
216                 url := me.UpstreamTrackerUrls[i]
217                 pendingUpstreams.Add(1)
218                 go func() {
219                         started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
220                         if err != nil {
221                                 log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
222                         }
223                         if err != nil || !started {
224                                 peersChan <- nil
225                                 return
226                         }
227                         log.Printf("announcing %x upstream to %v", infoHash, url)
228                         resp, err := client.Announce(announceCtx, subReq, tracker.AnnounceOpt{
229                                 UserAgent: "aragorn",
230                         })
231                         interval := resp.Interval
232                         go func() {
233                                 if interval < 5*60 {
234                                         // This is as much to reduce load on upstream trackers in the event of errors,
235                                         // as it is to reduce load on our peer store.
236                                         interval = 5 * 60
237                                 }
238                                 err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
239                                 if err != nil {
240                                         log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
241                                 }
242                         }()
243                         peersChan <- resp.Peers
244                         if err != nil {
245                                 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
246                         }
247                 }()
248         }
249         peersToTrack := make(map[string]Peer)
250         go func() {
251                 pendingUpstreams.Wait()
252                 cancel()
253                 close(peersChan)
254                 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
255                 for _, peer := range peersToTrack {
256                         addrPort, ok := peer.ToNetipAddrPort()
257                         if !ok {
258                                 continue
259                         }
260                         trackReq := AnnounceRequest{
261                                 InfoHash: infoHash,
262                                 Event:    tracker.Started,
263                                 Port:     uint16(peer.Port),
264                         }
265                         copy(trackReq.PeerId[:], peer.ID)
266                         err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
267                         if err != nil {
268                                 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
269                         }
270                 }
271                 me.mu.Lock()
272                 delete(me.ongoingUpstreamAugmentations, infoHash)
273                 me.mu.Unlock()
274         }()
275         curPeersChan := make(chan map[PeerInfo]struct{})
276         doneChan := make(chan struct{})
277         retPeers := make(map[PeerInfo]struct{})
278         go func() {
279                 defer close(doneChan)
280                 for {
281                         select {
282                         case peers, ok := <-peersChan:
283                                 if !ok {
284                                         return
285                                 }
286                                 voldemort(peers, peersToTrack, retPeers)
287                                 pendingUpstreams.Done()
288                         case curPeersChan <- copyPeerSet(retPeers):
289                         }
290                 }
291         }()
292         // Take return references.
293         return augmentationOperation{
294                 curPeers:       curPeersChan,
295                 finalPeers:     retPeers,
296                 doneAnnouncing: doneChan,
297         }
298 }
299
300 func copyPeerSet(orig peerSet) (ret peerSet) {
301         ret = make(peerSet, len(orig))
302         for k, v := range orig {
303                 ret[k] = v
304         }
305         return
306 }
307
308 // Adds peers to trailing containers.
309 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
310         for _, protoPeer := range peers {
311                 toTrack[protoPeer.String()] = protoPeer
312                 addr, ok := netip.AddrFromSlice(protoPeer.IP)
313                 if !ok {
314                         continue
315                 }
316                 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
317                 for _, set := range sets {
318                         set[handlerPeer] = struct{}{}
319                 }
320         }
321 }