10 "github.com/anacrolix/generics"
11 "github.com/anacrolix/log"
12 "go.opentelemetry.io/otel"
13 "go.opentelemetry.io/otel/attribute"
14 "go.opentelemetry.io/otel/trace"
16 "github.com/anacrolix/torrent/tracker/udp"
19 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
20 // limiting return count, etc.
21 type GetPeersOpts struct {
22 // Negative numbers are not allowed.
23 MaxCount generics.Option[uint]
26 type InfoHash = [20]byte
28 type PeerInfo struct {
32 type AnnounceAddr = netip.AddrPort
34 type AnnounceTracker interface {
35 TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
36 Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
37 GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ServerAnnounceResult
40 type ServerAnnounceResult struct {
43 Interval generics.Option[int32]
44 Leechers generics.Option[int32]
45 Seeders generics.Option[int32]
48 type AnnounceHandler struct {
49 AnnounceTracker AnnounceTracker
51 UpstreamTrackers []Client
52 UpstreamTrackerUrls []string
53 UpstreamAnnouncePeerId [20]byte
54 UpstreamAnnounceGate UpstreamAnnounceGater
57 // Operations are only removed when all the upstream peers have been tracked.
58 ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
61 type peerSet = map[PeerInfo]struct{}
63 type augmentationOperation struct {
64 // Closed when no more announce responses are pending. finalPeers will contain all the peers
66 doneAnnouncing chan struct{}
67 // This receives the latest peerSet until doneAnnouncing is closed.
69 // This contains the final peerSet after doneAnnouncing is closed.
73 func (me augmentationOperation) getCurPeers() (ret peerSet) {
74 ret, _ = me.getCurPeersAndDone()
78 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
80 case ret = <-me.curPeers:
81 case <-me.doneAnnouncing:
88 // Adds peers from new that aren't in orig. Modifies both arguments.
89 func addMissing(orig []PeerInfo, new peerSet) {
90 for _, peer := range orig {
93 for peer := range new {
94 orig = append(orig, peer)
98 var tracer = otel.Tracer("torrent.tracker.udp")
100 func (me *AnnounceHandler) Serve(
101 ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
102 ) (ret ServerAnnounceResult) {
103 ctx, span := tracer.Start(
105 "AnnounceHandler.Serve",
106 trace.WithAttributes(
107 attribute.Int64("announce.request.num_want", int64(req.NumWant)),
108 attribute.Int("announce.request.port", int(req.Port)),
109 attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
110 attribute.String("announce.request.event", req.Event.String()),
111 attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
112 attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
113 attribute.String("announce.source.addr.ip", addr.Addr().String()),
114 attribute.Int("announce.source.addr.port", int(addr.Port())),
119 span.SetAttributes(attribute.Int("announce.get_peers.len", len(ret.Peers)))
122 ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
126 infoHash := req.InfoHash
127 var op generics.Option[augmentationOperation]
128 // Grab a handle to any augmentations that are already running.
130 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
132 // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
133 // but it seems the most flexible.
134 if req.NumWant != -1 {
135 newCount := uint(req.NumWant)
136 if opts.MaxCount.Ok {
137 if newCount < opts.MaxCount.Value {
138 opts.MaxCount.Value = newCount
141 opts.MaxCount = generics.Some(newCount)
144 ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts)
148 // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
151 curPeers, done := op.Value.getCurPeersAndDone()
152 addMissing(ret.Peers, curPeers)
154 // It doesn't get any better with this operation. Forget it.
159 // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
160 // assuming the announcing peer might be that one. Really we should record a value to prevent
161 // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
163 if !op.Ok && len(ret.Peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
164 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
166 op.Set(me.augmentPeersFromUpstream(req.InfoHash))
167 generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
171 // Wait a while for the current operation.
173 // Force the augmentation to return with whatever it has if it hasn't completed in a
175 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
178 case <-op.Value.doneAnnouncing:
181 addMissing(ret.Peers, op.Value.getCurPeers())
186 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
187 const announceTimeout = time.Minute
188 announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
189 subReq := AnnounceRequest{
191 PeerId: me.UpstreamAnnouncePeerId,
197 peersChan := make(chan []Peer)
198 var pendingUpstreams sync.WaitGroup
199 for i := range me.UpstreamTrackers {
200 client := me.UpstreamTrackers[i]
201 url := me.UpstreamTrackerUrls[i]
202 pendingUpstreams.Add(1)
204 started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
206 log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
208 if err != nil || !started {
212 log.Printf("announcing %x upstream to %v", infoHash, url)
213 resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
214 UserAgent: "aragorn",
216 interval := resp.Interval
219 // This is as much to reduce load on upstream trackers in the event of errors,
220 // as it is to reduce load on our peer store.
223 err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
225 log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
228 peersChan <- resp.Peers
230 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
234 peersToTrack := make(map[string]Peer)
236 pendingUpstreams.Wait()
239 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
240 for _, peer := range peersToTrack {
241 addrPort, ok := peer.ToNetipAddrPort()
245 trackReq := AnnounceRequest{
248 Port: uint16(peer.Port),
250 copy(trackReq.PeerId[:], peer.ID)
251 err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
253 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
257 delete(me.ongoingUpstreamAugmentations, infoHash)
260 curPeersChan := make(chan map[PeerInfo]struct{})
261 doneChan := make(chan struct{})
262 retPeers := make(map[PeerInfo]struct{})
264 defer close(doneChan)
267 case peers, ok := <-peersChan:
271 voldemort(peers, peersToTrack, retPeers)
272 pendingUpstreams.Done()
273 case curPeersChan <- copyPeerSet(retPeers):
277 // Take return references.
278 return augmentationOperation{
279 curPeers: curPeersChan,
280 finalPeers: retPeers,
281 doneAnnouncing: doneChan,
285 func copyPeerSet(orig peerSet) (ret peerSet) {
286 ret = make(peerSet, len(orig))
287 for k, v := range orig {
293 // Adds peers to trailing containers.
294 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
295 for _, protoPeer := range peers {
296 toTrack[protoPeer.String()] = protoPeer
297 addr, ok := netip.AddrFromSlice(protoPeer.IP)
301 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
302 for _, set := range sets {
303 set[handlerPeer] = struct{}{}