10 "github.com/anacrolix/generics"
11 "github.com/anacrolix/log"
12 "go.opentelemetry.io/otel"
13 "go.opentelemetry.io/otel/attribute"
14 "go.opentelemetry.io/otel/trace"
16 "github.com/anacrolix/torrent/tracker/udp"
19 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
20 // limiting return count, etc.
21 type GetPeersOpts struct {
22 // Negative numbers are not allowed.
23 MaxCount generics.Option[uint]
26 type InfoHash = [20]byte
28 type PeerInfo struct {
32 type AnnounceAddr = netip.AddrPort
34 type AnnounceTracker interface {
35 TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
36 Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
37 GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
40 type AnnounceHandler struct {
41 AnnounceTracker AnnounceTracker
42 UpstreamTrackers []Client
43 UpstreamTrackerUrls []string
44 UpstreamAnnouncePeerId [20]byte
47 // Operations are only removed when all the upstream peers have been tracked.
48 ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
51 type peerSet = map[PeerInfo]struct{}
53 type augmentationOperation struct {
54 // Closed when no more announce responses are pending. finalPeers will contain all the peers
56 doneAnnouncing chan struct{}
57 // This receives the latest peerSet until doneAnnouncing is closed.
59 // This contains the final peerSet after doneAnnouncing is closed.
63 func (me augmentationOperation) getCurPeers() (ret peerSet) {
64 ret, _ = me.getCurPeersAndDone()
68 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
70 case ret = <-me.curPeers:
71 case <-me.doneAnnouncing:
78 // Adds peers from new that aren't in orig. Modifies both arguments.
79 func addMissing(orig []PeerInfo, new peerSet) {
80 for _, peer := range orig {
83 for peer := range new {
84 orig = append(orig, peer)
88 var tracer = otel.Tracer("torrent.tracker.udp")
90 func (me *AnnounceHandler) Serve(
91 ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
92 ) (peers []PeerInfo, err error) {
93 ctx, span := tracer.Start(
95 "AnnounceHandler.Serve",
97 attribute.Int64("announce.request.num_want", int64(req.NumWant)),
98 attribute.Int("announce.request.port", int(req.Port)),
99 attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
100 attribute.String("announce.request.event", req.Event.String()),
101 attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
102 attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
103 attribute.String("announce.source.addr.ip", addr.Addr().String()),
104 attribute.Int("announce.source.addr.port", int(addr.Port())),
109 span.SetAttributes(attribute.Int("announce.get_peers.len", len(peers)))
112 err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
116 infoHash := req.InfoHash
117 var op generics.Option[augmentationOperation]
118 // Grab a handle to any augmentations that are already running.
120 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
122 // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
123 // but it seems the most flexible.
124 if req.NumWant != -1 {
125 newCount := uint(req.NumWant)
126 if opts.MaxCount.Ok {
127 if newCount < opts.MaxCount.Value {
128 opts.MaxCount.Value = newCount
131 opts.MaxCount = generics.Some(newCount)
134 peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, opts)
138 // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
141 curPeers, done := op.Value.getCurPeersAndDone()
142 addMissing(peers, curPeers)
144 // It doesn't get any better with this operation. Forget it.
149 // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
150 // assuming the announcing peer might be that one. Really we should record a value to prevent
151 // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
153 if !op.Ok && len(peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
154 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
156 op.Set(me.augmentPeersFromUpstream(req.InfoHash))
157 generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
161 // Wait a while for the current operation.
163 // Force the augmentation to return with whatever it has if it hasn't completed in a
165 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
168 case <-op.Value.doneAnnouncing:
171 addMissing(peers, op.Value.getCurPeers())
176 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
177 announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
178 subReq := AnnounceRequest{
180 PeerId: me.UpstreamAnnouncePeerId,
186 peersChan := make(chan []Peer)
187 var pendingUpstreams sync.WaitGroup
188 for i := range me.UpstreamTrackers {
189 client := me.UpstreamTrackers[i]
190 url := me.UpstreamTrackerUrls[i]
191 pendingUpstreams.Add(1)
193 resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
194 UserAgent: "aragorn",
196 peersChan <- resp.Peers
198 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
202 peersToTrack := make(map[string]Peer)
204 pendingUpstreams.Wait()
207 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
208 for _, peer := range peersToTrack {
209 addrPort, ok := peer.ToNetipAddrPort()
213 trackReq := AnnounceRequest{
216 Port: uint16(peer.Port),
218 copy(trackReq.PeerId[:], peer.ID)
219 err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
221 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
225 delete(me.ongoingUpstreamAugmentations, infoHash)
228 curPeersChan := make(chan map[PeerInfo]struct{})
229 doneChan := make(chan struct{})
230 retPeers := make(map[PeerInfo]struct{})
232 defer close(doneChan)
235 case peers, ok := <-peersChan:
239 voldemort(peers, peersToTrack, retPeers)
240 pendingUpstreams.Done()
241 case curPeersChan <- copyPeerSet(retPeers):
245 // Take return references.
246 return augmentationOperation{
247 curPeers: curPeersChan,
248 finalPeers: retPeers,
249 doneAnnouncing: doneChan,
253 func copyPeerSet(orig peerSet) (ret peerSet) {
254 ret = make(peerSet, len(orig))
255 for k, v := range orig {
261 // Adds peers to trailing containers.
262 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
263 for _, protoPeer := range peers {
264 toTrack[protoPeer.String()] = protoPeer
265 addr, ok := netip.AddrFromSlice(protoPeer.IP)
269 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
270 for _, set := range sets {
271 set[handlerPeer] = struct{}{}