11 "github.com/anacrolix/generics"
12 "github.com/anacrolix/log"
13 "go.opentelemetry.io/otel"
14 "go.opentelemetry.io/otel/attribute"
15 "go.opentelemetry.io/otel/codes"
16 "go.opentelemetry.io/otel/trace"
18 "github.com/anacrolix/torrent/tracker/udp"
21 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
22 // limiting return count, etc.
23 type GetPeersOpts struct {
24 // Negative numbers are not allowed.
25 MaxCount generics.Option[uint]
28 type InfoHash = [20]byte
30 type PeerInfo struct {
34 type AnnounceAddr = netip.AddrPort
36 type AnnounceTracker interface {
37 TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
38 Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
39 GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ServerAnnounceResult
42 type ServerAnnounceResult struct {
45 Interval generics.Option[int32]
46 Leechers generics.Option[int32]
47 Seeders generics.Option[int32]
50 type AnnounceHandler struct {
51 AnnounceTracker AnnounceTracker
53 UpstreamTrackers []Client
54 UpstreamTrackerUrls []string
55 UpstreamAnnouncePeerId [20]byte
56 UpstreamAnnounceGate UpstreamAnnounceGater
59 // Operations are only removed when all the upstream peers have been tracked.
60 ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
63 type peerSet = map[PeerInfo]struct{}
65 type augmentationOperation struct {
66 // Closed when no more announce responses are pending. finalPeers will contain all the peers
68 doneAnnouncing chan struct{}
69 // This receives the latest peerSet until doneAnnouncing is closed.
71 // This contains the final peerSet after doneAnnouncing is closed.
75 func (me augmentationOperation) getCurPeers() (ret peerSet) {
76 ret, _ = me.getCurPeersAndDone()
80 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
82 case ret = <-me.curPeers:
83 case <-me.doneAnnouncing:
90 // Adds peers from new that aren't in orig. Modifies both arguments.
91 func addMissing(orig []PeerInfo, new peerSet) {
92 for _, peer := range orig {
95 for peer := range new {
96 orig = append(orig, peer)
100 var tracer = otel.Tracer("torrent.tracker.udp")
102 func (me *AnnounceHandler) Serve(
103 ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
104 ) (ret ServerAnnounceResult) {
105 ctx, span := tracer.Start(
107 "AnnounceHandler.Serve",
108 trace.WithAttributes(
109 attribute.Int64("announce.request.num_want", int64(req.NumWant)),
110 attribute.Int("announce.request.port", int(req.Port)),
111 attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
112 attribute.String("announce.request.event", req.Event.String()),
113 attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
114 attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
115 attribute.String("announce.source.addr.ip", addr.Addr().String()),
116 attribute.Int("announce.source.addr.port", int(addr.Port())),
121 span.SetAttributes(attribute.Int("announce.get_peers.len", len(ret.Peers)))
123 span.SetStatus(codes.Error, ret.Err.Error())
128 addr = netip.AddrPortFrom(addr.Addr(), req.Port)
130 ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
132 ret.Err = fmt.Errorf("tracking announce: %w", ret.Err)
135 infoHash := req.InfoHash
136 var op generics.Option[augmentationOperation]
137 // Grab a handle to any augmentations that are already running.
139 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
141 // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
142 // but it seems the most flexible.
143 if req.NumWant != -1 {
144 newCount := uint(req.NumWant)
145 if opts.MaxCount.Ok {
146 if newCount < opts.MaxCount.Value {
147 opts.MaxCount.Value = newCount
150 opts.MaxCount = generics.Some(newCount)
153 ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts)
157 // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
160 curPeers, done := op.Value.getCurPeersAndDone()
161 addMissing(ret.Peers, curPeers)
163 // It doesn't get any better with this operation. Forget it.
168 // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
169 // assuming the announcing peer might be that one. Really we should record a value to prevent
170 // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
172 if !op.Ok && len(ret.Peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
173 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
175 op.Set(me.augmentPeersFromUpstream(req.InfoHash))
176 generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
180 // Wait a while for the current operation.
182 // Force the augmentation to return with whatever it has if it hasn't completed in a
184 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
187 case <-op.Value.doneAnnouncing:
190 addMissing(ret.Peers, op.Value.getCurPeers())
195 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
196 const announceTimeout = time.Minute
197 announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
198 subReq := AnnounceRequest{
200 PeerId: me.UpstreamAnnouncePeerId,
206 peersChan := make(chan []Peer)
207 var pendingUpstreams sync.WaitGroup
208 for i := range me.UpstreamTrackers {
209 client := me.UpstreamTrackers[i]
210 url := me.UpstreamTrackerUrls[i]
211 pendingUpstreams.Add(1)
213 started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
215 log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
217 if err != nil || !started {
221 log.Printf("announcing %x upstream to %v", infoHash, url)
222 resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
223 UserAgent: "aragorn",
225 interval := resp.Interval
228 // This is as much to reduce load on upstream trackers in the event of errors,
229 // as it is to reduce load on our peer store.
232 err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
234 log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
237 peersChan <- resp.Peers
239 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
243 peersToTrack := make(map[string]Peer)
245 pendingUpstreams.Wait()
248 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
249 for _, peer := range peersToTrack {
250 addrPort, ok := peer.ToNetipAddrPort()
254 trackReq := AnnounceRequest{
257 Port: uint16(peer.Port),
259 copy(trackReq.PeerId[:], peer.ID)
260 err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
262 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
266 delete(me.ongoingUpstreamAugmentations, infoHash)
269 curPeersChan := make(chan map[PeerInfo]struct{})
270 doneChan := make(chan struct{})
271 retPeers := make(map[PeerInfo]struct{})
273 defer close(doneChan)
276 case peers, ok := <-peersChan:
280 voldemort(peers, peersToTrack, retPeers)
281 pendingUpstreams.Done()
282 case curPeersChan <- copyPeerSet(retPeers):
286 // Take return references.
287 return augmentationOperation{
288 curPeers: curPeersChan,
289 finalPeers: retPeers,
290 doneAnnouncing: doneChan,
294 func copyPeerSet(orig peerSet) (ret peerSet) {
295 ret = make(peerSet, len(orig))
296 for k, v := range orig {
302 // Adds peers to trailing containers.
303 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
304 for _, protoPeer := range peers {
305 toTrack[protoPeer.String()] = protoPeer
306 addr, ok := netip.AddrFromSlice(protoPeer.IP)
310 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
311 for _, set := range sets {
312 set[handlerPeer] = struct{}{}