10 "github.com/anacrolix/generics"
11 "github.com/anacrolix/log"
12 "go.opentelemetry.io/otel"
13 "go.opentelemetry.io/otel/attribute"
14 "go.opentelemetry.io/otel/trace"
16 "github.com/anacrolix/torrent/tracker/udp"
19 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
20 // limiting return count, etc.
21 type GetPeersOpts struct {
22 // Negative numbers are not allowed.
23 MaxCount generics.Option[uint]
26 type InfoHash = [20]byte
28 type PeerInfo struct {
32 type AnnounceAddr = netip.AddrPort
34 type AnnounceTracker interface {
35 TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
36 Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
37 GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
40 type AnnounceHandler struct {
41 AnnounceTracker AnnounceTracker
43 UpstreamTrackers []Client
44 UpstreamTrackerUrls []string
45 UpstreamAnnouncePeerId [20]byte
46 UpstreamAnnounceGate UpstreamAnnounceGater
49 // Operations are only removed when all the upstream peers have been tracked.
50 ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
53 type peerSet = map[PeerInfo]struct{}
55 type augmentationOperation struct {
56 // Closed when no more announce responses are pending. finalPeers will contain all the peers
58 doneAnnouncing chan struct{}
59 // This receives the latest peerSet until doneAnnouncing is closed.
61 // This contains the final peerSet after doneAnnouncing is closed.
65 func (me augmentationOperation) getCurPeers() (ret peerSet) {
66 ret, _ = me.getCurPeersAndDone()
70 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
72 case ret = <-me.curPeers:
73 case <-me.doneAnnouncing:
80 // Adds peers from new that aren't in orig. Modifies both arguments.
81 func addMissing(orig []PeerInfo, new peerSet) {
82 for _, peer := range orig {
85 for peer := range new {
86 orig = append(orig, peer)
90 var tracer = otel.Tracer("torrent.tracker.udp")
92 func (me *AnnounceHandler) Serve(
93 ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
94 ) (peers []PeerInfo, err error) {
95 ctx, span := tracer.Start(
97 "AnnounceHandler.Serve",
99 attribute.Int64("announce.request.num_want", int64(req.NumWant)),
100 attribute.Int("announce.request.port", int(req.Port)),
101 attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
102 attribute.String("announce.request.event", req.Event.String()),
103 attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
104 attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
105 attribute.String("announce.source.addr.ip", addr.Addr().String()),
106 attribute.Int("announce.source.addr.port", int(addr.Port())),
111 span.SetAttributes(attribute.Int("announce.get_peers.len", len(peers)))
114 err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
118 infoHash := req.InfoHash
119 var op generics.Option[augmentationOperation]
120 // Grab a handle to any augmentations that are already running.
122 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
124 // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
125 // but it seems the most flexible.
126 if req.NumWant != -1 {
127 newCount := uint(req.NumWant)
128 if opts.MaxCount.Ok {
129 if newCount < opts.MaxCount.Value {
130 opts.MaxCount.Value = newCount
133 opts.MaxCount = generics.Some(newCount)
136 peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, opts)
140 // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
143 curPeers, done := op.Value.getCurPeersAndDone()
144 addMissing(peers, curPeers)
146 // It doesn't get any better with this operation. Forget it.
151 // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
152 // assuming the announcing peer might be that one. Really we should record a value to prevent
153 // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
155 if !op.Ok && len(peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
156 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
158 op.Set(me.augmentPeersFromUpstream(req.InfoHash))
159 generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
163 // Wait a while for the current operation.
165 // Force the augmentation to return with whatever it has if it hasn't completed in a
167 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
170 case <-op.Value.doneAnnouncing:
173 addMissing(peers, op.Value.getCurPeers())
178 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
179 const announceTimeout = time.Minute
180 announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
181 subReq := AnnounceRequest{
183 PeerId: me.UpstreamAnnouncePeerId,
189 peersChan := make(chan []Peer)
190 var pendingUpstreams sync.WaitGroup
191 for i := range me.UpstreamTrackers {
192 client := me.UpstreamTrackers[i]
193 url := me.UpstreamTrackerUrls[i]
194 pendingUpstreams.Add(1)
196 started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
198 log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
200 if err != nil || !started {
204 log.Printf("announcing %x upstream to %v", infoHash, url)
205 resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
206 UserAgent: "aragorn",
208 interval := resp.Interval
211 // This is as much to reduce load on upstream trackers in the event of errors,
212 // as it is to reduce load on our peer store.
215 err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
217 log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
220 peersChan <- resp.Peers
222 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
226 peersToTrack := make(map[string]Peer)
228 pendingUpstreams.Wait()
231 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
232 for _, peer := range peersToTrack {
233 addrPort, ok := peer.ToNetipAddrPort()
237 trackReq := AnnounceRequest{
240 Port: uint16(peer.Port),
242 copy(trackReq.PeerId[:], peer.ID)
243 err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
245 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
249 delete(me.ongoingUpstreamAugmentations, infoHash)
252 curPeersChan := make(chan map[PeerInfo]struct{})
253 doneChan := make(chan struct{})
254 retPeers := make(map[PeerInfo]struct{})
256 defer close(doneChan)
259 case peers, ok := <-peersChan:
263 voldemort(peers, peersToTrack, retPeers)
264 pendingUpstreams.Done()
265 case curPeersChan <- copyPeerSet(retPeers):
269 // Take return references.
270 return augmentationOperation{
271 curPeers: curPeersChan,
272 finalPeers: retPeers,
273 doneAnnouncing: doneChan,
277 func copyPeerSet(orig peerSet) (ret peerSet) {
278 ret = make(peerSet, len(orig))
279 for k, v := range orig {
285 // Adds peers to trailing containers.
286 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
287 for _, protoPeer := range peers {
288 toTrack[protoPeer.String()] = protoPeer
289 addr, ok := netip.AddrFromSlice(protoPeer.IP)
293 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
294 for _, set := range sets {
295 set[handlerPeer] = struct{}{}