]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/server.go
dc95bce6841c4dc4adc77def60046ad12b524df3
[btrtrc.git] / tracker / server.go
1 package tracker
2
3 import (
4         "context"
5         "net/netip"
6         "sync"
7         "time"
8
9         "github.com/anacrolix/generics"
10         "github.com/anacrolix/log"
11
12         "github.com/anacrolix/torrent/tracker/udp"
13 )
14
15 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
16 // limiting return count, etc.
17 type GetPeersOpts struct{}
18
19 type InfoHash = [20]byte
20
21 type PeerInfo struct {
22         AnnounceAddr
23 }
24
25 type AnnounceAddr = netip.AddrPort
26
27 type AnnounceTracker interface {
28         TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
29         Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
30         GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
31 }
32
33 type AnnounceHandler struct {
34         AnnounceTracker        AnnounceTracker
35         UpstreamTrackers       []Client
36         UpstreamTrackerUrls    []string
37         UpstreamAnnouncePeerId [20]byte
38
39         mu sync.Mutex
40         // Operations are only removed when all the upstream peers have been tracked.
41         ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
42 }
43
44 type peerSet = map[PeerInfo]struct{}
45
46 type augmentationOperation struct {
47         // Closed when no more announce responses are pending. finalPeers will contain all the peers
48         // seen.
49         doneAnnouncing chan struct{}
50         // This receives the latest peerSet until doneAnnouncing is closed.
51         curPeers chan peerSet
52         // This contains the final peerSet after doneAnnouncing is closed.
53         finalPeers peerSet
54 }
55
56 func (me augmentationOperation) getCurPeers() (ret peerSet) {
57         ret, _ = me.getCurPeersAndDone()
58         return
59 }
60
61 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
62         select {
63         case ret = <-me.curPeers:
64         case <-me.doneAnnouncing:
65                 ret = me.finalPeers
66                 done = true
67         }
68         return
69 }
70
71 // Adds peers from new that aren't in orig. Modifies both arguments.
72 func addMissing(orig []PeerInfo, new peerSet) {
73         for _, peer := range orig {
74                 delete(new, peer)
75         }
76         for peer := range new {
77                 orig = append(orig, peer)
78         }
79 }
80
81 func (me *AnnounceHandler) Serve(
82         ctx context.Context, req AnnounceRequest, addr AnnounceAddr,
83 ) (peers []PeerInfo, err error) {
84         err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
85         if err != nil {
86                 return
87         }
88         infoHash := req.InfoHash
89         var op generics.Option[augmentationOperation]
90         // Grab a handle to any augmentations that are already running.
91         me.mu.Lock()
92         op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
93         me.mu.Unlock()
94         peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{})
95         if err != nil {
96                 return
97         }
98         // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
99         // the mutex or not.
100         if op.Ok {
101                 curPeers, done := op.Value.getCurPeersAndDone()
102                 addMissing(peers, curPeers)
103                 if done {
104                         // It doesn't get any better with this operation. Forget it.
105                         op.Ok = false
106                 }
107         }
108         me.mu.Lock()
109         // If we didn't have an operation, and don't have enough peers, start one.
110         if !op.Ok && len(peers) <= 1 {
111                 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
112                 if !op.Ok {
113                         op.Set(me.augmentPeersFromUpstream(req.InfoHash))
114                         generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
115                 }
116         }
117         me.mu.Unlock()
118         // Wait a while for the current operation.
119         if op.Ok {
120                 // Force the augmentation to return with whatever it has if it hasn't completed in a
121                 // reasonable time.
122                 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
123                 select {
124                 case <-ctx.Done():
125                 case <-op.Value.doneAnnouncing:
126                 }
127                 cancel()
128                 addMissing(peers, op.Value.getCurPeers())
129         }
130         return
131 }
132
133 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
134         announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
135         subReq := AnnounceRequest{
136                 InfoHash: infoHash,
137                 PeerId:   me.UpstreamAnnouncePeerId,
138                 Event:    None,
139                 Key:      0,
140                 NumWant:  -1,
141                 Port:     0,
142         }
143         peersChan := make(chan []Peer)
144         var pendingUpstreams sync.WaitGroup
145         for i := range me.UpstreamTrackers {
146                 client := me.UpstreamTrackers[i]
147                 url := me.UpstreamTrackerUrls[i]
148                 pendingUpstreams.Add(1)
149                 go func() {
150                         resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
151                                 UserAgent: "aragorn",
152                         })
153                         peersChan <- resp.Peers
154                         if err != nil {
155                                 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
156                         }
157                 }()
158         }
159         peersToTrack := make(map[string]Peer)
160         go func() {
161                 pendingUpstreams.Wait()
162                 cancel()
163                 close(peersChan)
164                 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
165                 for _, peer := range peersToTrack {
166                         addrPort, ok := peer.ToNetipAddrPort()
167                         if !ok {
168                                 continue
169                         }
170                         trackReq := AnnounceRequest{
171                                 InfoHash: infoHash,
172                                 Event:    Started,
173                                 Port:     uint16(peer.Port),
174                         }
175                         copy(trackReq.PeerId[:], peer.ID)
176                         err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
177                         if err != nil {
178                                 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
179                         }
180                 }
181                 me.mu.Lock()
182                 delete(me.ongoingUpstreamAugmentations, infoHash)
183                 me.mu.Unlock()
184         }()
185         curPeersChan := make(chan map[PeerInfo]struct{})
186         doneChan := make(chan struct{})
187         retPeers := make(map[PeerInfo]struct{})
188         go func() {
189                 defer close(doneChan)
190                 for {
191                         select {
192                         case peers, ok := <-peersChan:
193                                 if !ok {
194                                         return
195                                 }
196                                 voldemort(peers, peersToTrack, retPeers)
197                                 pendingUpstreams.Done()
198                         case curPeersChan <- copyPeerSet(retPeers):
199                         }
200                 }
201         }()
202         // Take return references.
203         return augmentationOperation{
204                 curPeers:       curPeersChan,
205                 finalPeers:     retPeers,
206                 doneAnnouncing: doneChan,
207         }
208 }
209
210 func copyPeerSet(orig peerSet) (ret peerSet) {
211         ret = make(peerSet, len(orig))
212         for k, v := range orig {
213                 ret[k] = v
214         }
215         return
216 }
217
218 // Adds peers to trailing containers.
219 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
220         for _, protoPeer := range peers {
221                 toTrack[protoPeer.String()] = protoPeer
222                 addr, ok := netip.AddrFromSlice(protoPeer.IP)
223                 if !ok {
224                         continue
225                 }
226                 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
227                 for _, set := range sets {
228                         set[handlerPeer] = struct{}{}
229                 }
230         }
231 }