11 "github.com/anacrolix/generics"
12 "github.com/anacrolix/log"
13 "github.com/anacrolix/torrent/tracker"
14 "go.opentelemetry.io/otel"
15 "go.opentelemetry.io/otel/attribute"
16 "go.opentelemetry.io/otel/codes"
17 "go.opentelemetry.io/otel/trace"
19 "github.com/anacrolix/torrent/tracker/udp"
22 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
23 // limiting return count, etc.
24 type GetPeersOpts struct {
25 // Negative numbers are not allowed.
26 MaxCount generics.Option[uint]
29 type InfoHash = [20]byte
31 type PeerInfo struct {
35 type AnnounceAddr = netip.AddrPort
37 type AnnounceTracker interface {
38 TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
39 Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
45 ) ServerAnnounceResult
48 type ServerAnnounceResult struct {
51 Interval generics.Option[int32]
52 Leechers generics.Option[int32]
53 Seeders generics.Option[int32]
56 type AnnounceHandler struct {
57 AnnounceTracker AnnounceTracker
59 UpstreamTrackers []Client
60 UpstreamTrackerUrls []string
61 UpstreamAnnouncePeerId [20]byte
62 UpstreamAnnounceGate UpstreamAnnounceGater
65 // Operations are only removed when all the upstream peers have been tracked.
66 ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
69 type peerSet = map[PeerInfo]struct{}
71 type augmentationOperation struct {
72 // Closed when no more announce responses are pending. finalPeers will contain all the peers
74 doneAnnouncing chan struct{}
75 // This receives the latest peerSet until doneAnnouncing is closed.
77 // This contains the final peerSet after doneAnnouncing is closed.
81 func (me augmentationOperation) getCurPeers() (ret peerSet) {
82 ret, _ = me.getCurPeersAndDone()
86 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
88 case ret = <-me.curPeers:
89 case <-me.doneAnnouncing:
96 // Adds peers from new that aren't in orig. Modifies both arguments.
97 func addMissing(orig []PeerInfo, new peerSet) {
98 for _, peer := range orig {
101 for peer := range new {
102 orig = append(orig, peer)
106 var tracer = otel.Tracer("torrent.tracker.udp")
108 func (me *AnnounceHandler) Serve(
109 ctx context.Context, req AnnounceRequest, addr AnnounceAddr, opts GetPeersOpts,
110 ) (ret ServerAnnounceResult) {
111 ctx, span := tracer.Start(
113 "AnnounceHandler.Serve",
114 trace.WithAttributes(
115 attribute.Int64("announce.request.num_want", int64(req.NumWant)),
116 attribute.Int("announce.request.port", int(req.Port)),
117 attribute.String("announce.request.info_hash", hex.EncodeToString(req.InfoHash[:])),
118 attribute.String("announce.request.event", req.Event.String()),
119 attribute.Int64("announce.get_peers.opts.max_count_value", int64(opts.MaxCount.Value)),
120 attribute.Bool("announce.get_peers.opts.max_count_ok", opts.MaxCount.Ok),
121 attribute.String("announce.source.addr.ip", addr.Addr().String()),
122 attribute.Int("announce.source.addr.port", int(addr.Port())),
127 span.SetAttributes(attribute.Int("announce.get_peers.len", len(ret.Peers)))
129 span.SetStatus(codes.Error, ret.Err.Error())
134 addr = netip.AddrPortFrom(addr.Addr(), req.Port)
136 ret.Err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
138 ret.Err = fmt.Errorf("tracking announce: %w", ret.Err)
141 infoHash := req.InfoHash
142 var op generics.Option[augmentationOperation]
143 // Grab a handle to any augmentations that are already running.
145 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
147 // Apply num_want limit to max count. I really can't tell if this is the right place to do it,
148 // but it seems the most flexible.
149 if req.NumWant != -1 {
150 newCount := uint(req.NumWant)
151 if opts.MaxCount.Ok {
152 if newCount < opts.MaxCount.Value {
153 opts.MaxCount.Value = newCount
156 opts.MaxCount = generics.Some(newCount)
159 ret = me.AnnounceTracker.GetPeers(ctx, infoHash, opts, addr)
163 // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
166 curPeers, done := op.Value.getCurPeersAndDone()
167 addMissing(ret.Peers, curPeers)
169 // It doesn't get any better with this operation. Forget it.
174 // If we didn't have an operation, and don't have enough peers, start one. Allowing 1 is
175 // assuming the announcing peer might be that one. Really we should record a value to prevent
176 // duplicate announces. Also don't announce upstream if we got no peers because the caller asked
178 if !op.Ok && len(ret.Peers) <= 1 && opts.MaxCount.UnwrapOr(1) > 0 {
179 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
181 op.Set(me.augmentPeersFromUpstream(req.InfoHash))
182 generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
186 // Wait a while for the current operation.
188 // Force the augmentation to return with whatever it has if it hasn't completed in a
190 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
193 case <-op.Value.doneAnnouncing:
196 addMissing(ret.Peers, op.Value.getCurPeers())
201 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
202 const announceTimeout = time.Minute
203 announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
204 subReq := AnnounceRequest{
206 PeerId: me.UpstreamAnnouncePeerId,
212 peersChan := make(chan []Peer)
213 var pendingUpstreams sync.WaitGroup
214 for i := range me.UpstreamTrackers {
215 client := me.UpstreamTrackers[i]
216 url := me.UpstreamTrackerUrls[i]
217 pendingUpstreams.Add(1)
219 started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
221 log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
223 if err != nil || !started {
227 log.Printf("announcing %x upstream to %v", infoHash, url)
228 resp, err := client.Announce(announceCtx, subReq, tracker.AnnounceOpt{
229 UserAgent: "aragorn",
231 interval := resp.Interval
234 // This is as much to reduce load on upstream trackers in the event of errors,
235 // as it is to reduce load on our peer store.
238 err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
240 log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
243 peersChan <- resp.Peers
245 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
249 peersToTrack := make(map[string]Peer)
251 pendingUpstreams.Wait()
254 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers", len(peersToTrack))
255 for _, peer := range peersToTrack {
256 addrPort, ok := peer.ToNetipAddrPort()
260 trackReq := AnnounceRequest{
262 Event: tracker.Started,
263 Port: uint16(peer.Port),
265 copy(trackReq.PeerId[:], peer.ID)
266 err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
268 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
272 delete(me.ongoingUpstreamAugmentations, infoHash)
275 curPeersChan := make(chan map[PeerInfo]struct{})
276 doneChan := make(chan struct{})
277 retPeers := make(map[PeerInfo]struct{})
279 defer close(doneChan)
282 case peers, ok := <-peersChan:
286 voldemort(peers, peersToTrack, retPeers)
287 pendingUpstreams.Done()
288 case curPeersChan <- copyPeerSet(retPeers):
292 // Take return references.
293 return augmentationOperation{
294 curPeers: curPeersChan,
295 finalPeers: retPeers,
296 doneAnnouncing: doneChan,
300 func copyPeerSet(orig peerSet) (ret peerSet) {
301 ret = make(peerSet, len(orig))
302 for k, v := range orig {
308 // Adds peers to trailing containers.
309 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
310 for _, protoPeer := range peers {
311 toTrack[protoPeer.String()] = protoPeer
312 addr, ok := netip.AddrFromSlice(protoPeer.IP)
316 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
317 for _, set := range sets {
318 set[handlerPeer] = struct{}{}