"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/tracker"
httpTracker "github.com/anacrolix/torrent/tracker/http"
- udpTrackerServer "github.com/anacrolix/torrent/tracker/udp/server"
)
type Handler struct {
- AnnounceTracker udpTrackerServer.AnnounceTracker
+ Announce tracker.AnnounceHandler
// Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is
// used. Necessary for instances running behind reverse proxies for example.
RequestHost func(r *http.Request) (netip.Addr, error)
}
portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16)
addrPort := netip.AddrPortFrom(addr, uint16(portU64))
- err = me.AnnounceTracker.TrackAnnounce(r.Context(), tracker.AnnounceRequest{
+ peers, err := me.Announce.Serve(r.Context(), tracker.AnnounceRequest{
InfoHash: infoHash,
PeerId: peerId,
Event: event,
Port: addrPort.Port(),
}, addrPort)
if err != nil {
- log.Printf("error tracking announce: %v", err)
- http.Error(w, "error tracking announce", http.StatusInternalServerError)
- return
- }
- peers, err := me.AnnounceTracker.GetPeers(r.Context(), infoHash, tracker.GetPeersOpts{})
- if err != nil {
- log.Printf("error getting peers: %v", err)
- http.Error(w, "error getting peers", http.StatusInternalServerError)
+ log.Printf("error serving announce: %v", err)
+ http.Error(w, "error handling announce", http.StatusInternalServerError)
return
}
var resp httpTracker.HttpResponse
import (
"context"
"net/netip"
+ "sync"
+ "time"
+ "github.com/anacrolix/generics"
+ "github.com/anacrolix/log"
"github.com/anacrolix/torrent/tracker/udp"
)
GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
}
-//
-//type Server struct {
-// AnnounceTracker AnnounceTracker
-//}
-//
-//func (me Server) HandleAnnounce(req udp.AnnounceRequest, sourceAddr AnnounceAddr) error {
-//
-//}
+type AnnounceHandler struct {
+ AnnounceTracker AnnounceTracker
+ UpstreamTrackers []Client
+ UpstreamTrackerUrls []string
+ UpstreamAnnouncePeerId [20]byte
+
+ mu sync.Mutex
+ // Operations are only removed when all the upstream peers have been tracked.
+ ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
+}
+
+type peerSet = map[PeerInfo]struct{}
+
+type augmentationOperation struct {
+ // Closed when no more announce responses are pending. finalPeers will contain all the peers
+ // seen.
+ doneAnnouncing chan struct{}
+ // This receives the latest peerSet until doneAnnouncing is closed.
+ curPeers chan peerSet
+ // This contains the final peerSet after doneAnnouncing is closed.
+ finalPeers peerSet
+}
+
+func (me augmentationOperation) getCurPeers() (ret peerSet) {
+ ret, _ = me.getCurPeersAndDone()
+ return
+}
+
+func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
+ select {
+ case ret = <-me.curPeers:
+ case <-me.doneAnnouncing:
+ ret = me.finalPeers
+ done = true
+ }
+ return
+}
+
+// Adds peers from new that aren't in orig. Modifies both arguments.
+func addMissing(orig []PeerInfo, new peerSet) {
+ for _, peer := range orig {
+ delete(new, peer)
+ }
+ for peer := range new {
+ orig = append(orig, peer)
+ }
+}
+
+func (me *AnnounceHandler) Serve(
+ ctx context.Context, req AnnounceRequest, addr AnnounceAddr,
+) (peers []PeerInfo, err error) {
+ err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
+ if err != nil {
+ return
+ }
+ infoHash := req.InfoHash
+ var op generics.Option[augmentationOperation]
+ // Grab a handle to any augmentations that are already running.
+ me.mu.Lock()
+ op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
+ me.mu.Unlock()
+ peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{})
+ if err != nil {
+ return
+ }
+ // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
+ // the mutex or not.
+ if op.Ok {
+ curPeers, done := op.Value.getCurPeersAndDone()
+ addMissing(peers, curPeers)
+ if done {
+ // It doesn't get any better with this operation. Forget it.
+ op.Ok = false
+ }
+ }
+ me.mu.Lock()
+ // If we didn't have an operation, and don't have enough peers, start one.
+ if !op.Ok && len(peers) <= 1 {
+ op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
+ if !op.Ok {
+ op.Set(me.augmentPeersFromUpstream(req))
+ generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
+ }
+ }
+ me.mu.Unlock()
+ // Wait a while for the current operation.
+ if op.Ok {
+ // Force the augmentation to return with whatever it has if it hasn't completed in a
+ // reasonable time.
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ select {
+ case <-ctx.Done():
+ case <-op.Value.doneAnnouncing:
+ }
+ cancel()
+ addMissing(peers, op.Value.getCurPeers())
+ }
+ return
+}
+
+func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augmentationOperation {
+ announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
+ subReq := AnnounceRequest{
+ InfoHash: req.InfoHash,
+ PeerId: me.UpstreamAnnouncePeerId,
+ Event: None,
+ Key: 0,
+ NumWant: -1,
+ Port: 0,
+ }
+ peersChan := make(chan []Peer)
+ var pendingUpstreams sync.WaitGroup
+ for i := range me.UpstreamTrackers {
+ client := me.UpstreamTrackers[i]
+ url := me.UpstreamTrackerUrls[i]
+ pendingUpstreams.Add(1)
+ go func() {
+ resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
+ UserAgent: "aragorn",
+ })
+ peersChan <- resp.Peers
+ if err != nil {
+ log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
+ }
+ }()
+ }
+ peersToTrack := make(map[string]Peer)
+ go func() {
+ pendingUpstreams.Wait()
+ cancel()
+ close(peersChan)
+ log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers")
+ for _, peer := range peersToTrack {
+ addrPort, ok := peer.ToNetipAddrPort()
+ if !ok {
+ continue
+ }
+ trackReq := AnnounceRequest{
+ InfoHash: req.InfoHash,
+ Event: Started,
+ Port: uint16(peer.Port),
+ }
+ copy(trackReq.PeerId[:], peer.ID)
+ err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
+ if err != nil {
+ log.Levelf(log.Error, "error tracking upstream peer: %v", err)
+ }
+ }
+ me.mu.Lock()
+ delete(me.ongoingUpstreamAugmentations, req.InfoHash)
+ me.mu.Unlock()
+ }()
+ curPeersChan := make(chan map[PeerInfo]struct{})
+ doneChan := make(chan struct{})
+ retPeers := make(map[PeerInfo]struct{})
+ go func() {
+ for {
+ select {
+ case peers, ok := <-peersChan:
+ if !ok {
+ return
+ }
+ voldemort(peers, peersToTrack, retPeers)
+ pendingUpstreams.Done()
+ case curPeersChan <- copyPeerSet(retPeers):
+ }
+ }
+ }()
+ // Take return references.
+ return augmentationOperation{
+ curPeers: curPeersChan,
+ finalPeers: retPeers,
+ doneAnnouncing: doneChan,
+ }
+}
+
+func copyPeerSet(orig peerSet) (ret peerSet) {
+ ret = make(peerSet, len(orig))
+ for k, v := range orig {
+ ret[k] = v
+ }
+ return
+}
+
+// Adds peers to trailing containers.
+func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
+ for _, protoPeer := range peers {
+ toTrack[protoPeer.String()] = protoPeer
+ addr, ok := netip.AddrFromSlice(protoPeer.IP)
+ if !ok {
+ continue
+ }
+ handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
+ for _, set := range sets {
+ set[handlerPeer] = struct{}{}
+ }
+ }
+}