]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/server.go
Support upstream trackers
[btrtrc.git] / tracker / server.go
1 package tracker
2
3 import (
4         "context"
5         "net/netip"
6         "sync"
7         "time"
8
9         "github.com/anacrolix/generics"
10         "github.com/anacrolix/log"
11         "github.com/anacrolix/torrent/tracker/udp"
12 )
13
14 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
15 // limiting return count, etc.
16 type GetPeersOpts struct{}
17
18 type InfoHash = [20]byte
19
20 type PeerInfo struct {
21         AnnounceAddr
22 }
23
24 type AnnounceAddr = netip.AddrPort
25
26 type AnnounceTracker interface {
27         TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
28         Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
29         GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
30 }
31
32 type AnnounceHandler struct {
33         AnnounceTracker        AnnounceTracker
34         UpstreamTrackers       []Client
35         UpstreamTrackerUrls    []string
36         UpstreamAnnouncePeerId [20]byte
37
38         mu sync.Mutex
39         // Operations are only removed when all the upstream peers have been tracked.
40         ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
41 }
42
43 type peerSet = map[PeerInfo]struct{}
44
45 type augmentationOperation struct {
46         // Closed when no more announce responses are pending. finalPeers will contain all the peers
47         // seen.
48         doneAnnouncing chan struct{}
49         // This receives the latest peerSet until doneAnnouncing is closed.
50         curPeers chan peerSet
51         // This contains the final peerSet after doneAnnouncing is closed.
52         finalPeers peerSet
53 }
54
55 func (me augmentationOperation) getCurPeers() (ret peerSet) {
56         ret, _ = me.getCurPeersAndDone()
57         return
58 }
59
60 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
61         select {
62         case ret = <-me.curPeers:
63         case <-me.doneAnnouncing:
64                 ret = me.finalPeers
65                 done = true
66         }
67         return
68 }
69
70 // Adds peers from new that aren't in orig. Modifies both arguments.
71 func addMissing(orig []PeerInfo, new peerSet) {
72         for _, peer := range orig {
73                 delete(new, peer)
74         }
75         for peer := range new {
76                 orig = append(orig, peer)
77         }
78 }
79
80 func (me *AnnounceHandler) Serve(
81         ctx context.Context, req AnnounceRequest, addr AnnounceAddr,
82 ) (peers []PeerInfo, err error) {
83         err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
84         if err != nil {
85                 return
86         }
87         infoHash := req.InfoHash
88         var op generics.Option[augmentationOperation]
89         // Grab a handle to any augmentations that are already running.
90         me.mu.Lock()
91         op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
92         me.mu.Unlock()
93         peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{})
94         if err != nil {
95                 return
96         }
97         // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
98         // the mutex or not.
99         if op.Ok {
100                 curPeers, done := op.Value.getCurPeersAndDone()
101                 addMissing(peers, curPeers)
102                 if done {
103                         // It doesn't get any better with this operation. Forget it.
104                         op.Ok = false
105                 }
106         }
107         me.mu.Lock()
108         // If we didn't have an operation, and don't have enough peers, start one.
109         if !op.Ok && len(peers) <= 1 {
110                 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
111                 if !op.Ok {
112                         op.Set(me.augmentPeersFromUpstream(req))
113                         generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
114                 }
115         }
116         me.mu.Unlock()
117         // Wait a while for the current operation.
118         if op.Ok {
119                 // Force the augmentation to return with whatever it has if it hasn't completed in a
120                 // reasonable time.
121                 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
122                 select {
123                 case <-ctx.Done():
124                 case <-op.Value.doneAnnouncing:
125                 }
126                 cancel()
127                 addMissing(peers, op.Value.getCurPeers())
128         }
129         return
130 }
131
132 func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augmentationOperation {
133         announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
134         subReq := AnnounceRequest{
135                 InfoHash: req.InfoHash,
136                 PeerId:   me.UpstreamAnnouncePeerId,
137                 Event:    None,
138                 Key:      0,
139                 NumWant:  -1,
140                 Port:     0,
141         }
142         peersChan := make(chan []Peer)
143         var pendingUpstreams sync.WaitGroup
144         for i := range me.UpstreamTrackers {
145                 client := me.UpstreamTrackers[i]
146                 url := me.UpstreamTrackerUrls[i]
147                 pendingUpstreams.Add(1)
148                 go func() {
149                         resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
150                                 UserAgent: "aragorn",
151                         })
152                         peersChan <- resp.Peers
153                         if err != nil {
154                                 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
155                         }
156                 }()
157         }
158         peersToTrack := make(map[string]Peer)
159         go func() {
160                 pendingUpstreams.Wait()
161                 cancel()
162                 close(peersChan)
163                 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers")
164                 for _, peer := range peersToTrack {
165                         addrPort, ok := peer.ToNetipAddrPort()
166                         if !ok {
167                                 continue
168                         }
169                         trackReq := AnnounceRequest{
170                                 InfoHash: req.InfoHash,
171                                 Event:    Started,
172                                 Port:     uint16(peer.Port),
173                         }
174                         copy(trackReq.PeerId[:], peer.ID)
175                         err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
176                         if err != nil {
177                                 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
178                         }
179                 }
180                 me.mu.Lock()
181                 delete(me.ongoingUpstreamAugmentations, req.InfoHash)
182                 me.mu.Unlock()
183         }()
184         curPeersChan := make(chan map[PeerInfo]struct{})
185         doneChan := make(chan struct{})
186         retPeers := make(map[PeerInfo]struct{})
187         go func() {
188                 for {
189                         select {
190                         case peers, ok := <-peersChan:
191                                 if !ok {
192                                         return
193                                 }
194                                 voldemort(peers, peersToTrack, retPeers)
195                                 pendingUpstreams.Done()
196                         case curPeersChan <- copyPeerSet(retPeers):
197                         }
198                 }
199         }()
200         // Take return references.
201         return augmentationOperation{
202                 curPeers:       curPeersChan,
203                 finalPeers:     retPeers,
204                 doneAnnouncing: doneChan,
205         }
206 }
207
208 func copyPeerSet(orig peerSet) (ret peerSet) {
209         ret = make(peerSet, len(orig))
210         for k, v := range orig {
211                 ret[k] = v
212         }
213         return
214 }
215
216 // Adds peers to trailing containers.
217 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
218         for _, protoPeer := range peers {
219                 toTrack[protoPeer.String()] = protoPeer
220                 addr, ok := netip.AddrFromSlice(protoPeer.IP)
221                 if !ok {
222                         continue
223                 }
224                 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
225                 for _, set := range sets {
226                         set[handlerPeer] = struct{}{}
227                 }
228         }
229 }