9 "github.com/anacrolix/generics"
10 "github.com/anacrolix/log"
11 "github.com/anacrolix/torrent/tracker/udp"
14 // This is reserved for stuff like filtering by IP version, avoiding an announcer's IP or key,
15 // limiting return count, etc.
16 type GetPeersOpts struct{}
18 type InfoHash = [20]byte
20 type PeerInfo struct {
24 type AnnounceAddr = netip.AddrPort
26 type AnnounceTracker interface {
27 TrackAnnounce(ctx context.Context, req udp.AnnounceRequest, addr AnnounceAddr) error
28 Scrape(ctx context.Context, infoHashes []InfoHash) ([]udp.ScrapeInfohashResult, error)
29 GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
32 type AnnounceHandler struct {
33 AnnounceTracker AnnounceTracker
34 UpstreamTrackers []Client
35 UpstreamTrackerUrls []string
36 UpstreamAnnouncePeerId [20]byte
39 // Operations are only removed when all the upstream peers have been tracked.
40 ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
43 type peerSet = map[PeerInfo]struct{}
45 type augmentationOperation struct {
46 // Closed when no more announce responses are pending. finalPeers will contain all the peers
48 doneAnnouncing chan struct{}
49 // This receives the latest peerSet until doneAnnouncing is closed.
51 // This contains the final peerSet after doneAnnouncing is closed.
55 func (me augmentationOperation) getCurPeers() (ret peerSet) {
56 ret, _ = me.getCurPeersAndDone()
60 func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
62 case ret = <-me.curPeers:
63 case <-me.doneAnnouncing:
70 // Adds peers from new that aren't in orig. Modifies both arguments.
71 func addMissing(orig []PeerInfo, new peerSet) {
72 for _, peer := range orig {
75 for peer := range new {
76 orig = append(orig, peer)
80 func (me *AnnounceHandler) Serve(
81 ctx context.Context, req AnnounceRequest, addr AnnounceAddr,
82 ) (peers []PeerInfo, err error) {
83 err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
87 infoHash := req.InfoHash
88 var op generics.Option[augmentationOperation]
89 // Grab a handle to any augmentations that are already running.
91 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
93 peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{})
97 // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
100 curPeers, done := op.Value.getCurPeersAndDone()
101 addMissing(peers, curPeers)
103 // It doesn't get any better with this operation. Forget it.
108 // If we didn't have an operation, and don't have enough peers, start one.
109 if !op.Ok && len(peers) <= 1 {
110 op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
112 op.Set(me.augmentPeersFromUpstream(req))
113 generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
117 // Wait a while for the current operation.
119 // Force the augmentation to return with whatever it has if it hasn't completed in a
121 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
124 case <-op.Value.doneAnnouncing:
127 addMissing(peers, op.Value.getCurPeers())
132 func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augmentationOperation {
133 announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
134 subReq := AnnounceRequest{
135 InfoHash: req.InfoHash,
136 PeerId: me.UpstreamAnnouncePeerId,
142 peersChan := make(chan []Peer)
143 var pendingUpstreams sync.WaitGroup
144 for i := range me.UpstreamTrackers {
145 client := me.UpstreamTrackers[i]
146 url := me.UpstreamTrackerUrls[i]
147 pendingUpstreams.Add(1)
149 resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
150 UserAgent: "aragorn",
152 peersChan <- resp.Peers
154 log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
158 peersToTrack := make(map[string]Peer)
160 pendingUpstreams.Wait()
163 log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers")
164 for _, peer := range peersToTrack {
165 addrPort, ok := peer.ToNetipAddrPort()
169 trackReq := AnnounceRequest{
170 InfoHash: req.InfoHash,
172 Port: uint16(peer.Port),
174 copy(trackReq.PeerId[:], peer.ID)
175 err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
177 log.Levelf(log.Error, "error tracking upstream peer: %v", err)
181 delete(me.ongoingUpstreamAugmentations, req.InfoHash)
184 curPeersChan := make(chan map[PeerInfo]struct{})
185 doneChan := make(chan struct{})
186 retPeers := make(map[PeerInfo]struct{})
190 case peers, ok := <-peersChan:
194 voldemort(peers, peersToTrack, retPeers)
195 pendingUpstreams.Done()
196 case curPeersChan <- copyPeerSet(retPeers):
200 // Take return references.
201 return augmentationOperation{
202 curPeers: curPeersChan,
203 finalPeers: retPeers,
204 doneAnnouncing: doneChan,
208 func copyPeerSet(orig peerSet) (ret peerSet) {
209 ret = make(peerSet, len(orig))
210 for k, v := range orig {
216 // Adds peers to trailing containers.
217 func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
218 for _, protoPeer := range peers {
219 toTrack[protoPeer.String()] = protoPeer
220 addr, ok := netip.AddrFromSlice(protoPeer.IP)
224 handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
225 for _, set := range sets {
226 set[handlerPeer] = struct{}{}