]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Support upstream trackers
authorMatt Joiner <anacrolix@gmail.com>
Tue, 6 Dec 2022 14:54:38 +0000 (01:54 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 6 Dec 2022 23:45:12 +0000 (10:45 +1100)
tracker/http/peer.go
tracker/http/server/server.go
tracker/server.go
tracker/udp/server/server.go

index 363ba6d3c7b9f7d317b0f2785a3e5338c16df49e..b0deee0b3e540772669d74ceed32e43c497ed601 100644 (file)
@@ -3,16 +3,24 @@ package httpTracker
 import (
        "fmt"
        "net"
+       "net/netip"
 
        "github.com/anacrolix/dht/v2/krpc"
 )
 
+// TODO: Use netip.Addr and Option[[20]byte].
 type Peer struct {
        IP   net.IP `bencode:"ip"`
        Port int    `bencode:"port"`
        ID   []byte `bencode:"peer id"`
 }
 
+func (p Peer) ToNetipAddrPort() (addrPort netip.AddrPort, ok bool) {
+       addr, ok := netip.AddrFromSlice(p.IP)
+       addrPort = netip.AddrPortFrom(addr, uint16(p.Port))
+       return
+}
+
 func (p Peer) String() string {
        loc := net.JoinHostPort(p.IP.String(), fmt.Sprintf("%d", p.Port))
        if len(p.ID) != 0 {
index 0840fe96cf0a205e46bd78817513381f52014de7..10713a1bd35212e5e9f89a39958b54f4bddff1bd 100644 (file)
@@ -14,11 +14,10 @@ import (
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/tracker"
        httpTracker "github.com/anacrolix/torrent/tracker/http"
-       udpTrackerServer "github.com/anacrolix/torrent/tracker/udp/server"
 )
 
 type Handler struct {
-       AnnounceTracker udpTrackerServer.AnnounceTracker
+       Announce tracker.AnnounceHandler
        // Called to derive an announcer's IP if non-nil. If not specified, the Request.RemoteAddr is
        // used. Necessary for instances running behind reverse proxies for example.
        RequestHost func(r *http.Request) (netip.Addr, error)
@@ -74,21 +73,15 @@ func (me Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        }
        portU64, err := strconv.ParseUint(vs.Get("port"), 0, 16)
        addrPort := netip.AddrPortFrom(addr, uint16(portU64))
-       err = me.AnnounceTracker.TrackAnnounce(r.Context(), tracker.AnnounceRequest{
+       peers, err := me.Announce.Serve(r.Context(), tracker.AnnounceRequest{
                InfoHash: infoHash,
                PeerId:   peerId,
                Event:    event,
                Port:     addrPort.Port(),
        }, addrPort)
        if err != nil {
-               log.Printf("error tracking announce: %v", err)
-               http.Error(w, "error tracking announce", http.StatusInternalServerError)
-               return
-       }
-       peers, err := me.AnnounceTracker.GetPeers(r.Context(), infoHash, tracker.GetPeersOpts{})
-       if err != nil {
-               log.Printf("error getting peers: %v", err)
-               http.Error(w, "error getting peers", http.StatusInternalServerError)
+               log.Printf("error serving announce: %v", err)
+               http.Error(w, "error handling announce", http.StatusInternalServerError)
                return
        }
        var resp httpTracker.HttpResponse
index 077d32445534eb2f9053a8d19bed816106a22fea..2b483436f109d6c208a3c70c8e5b64dc4ee3eebd 100644 (file)
@@ -3,7 +3,11 @@ package tracker
 import (
        "context"
        "net/netip"
+       "sync"
+       "time"
 
+       "github.com/anacrolix/generics"
+       "github.com/anacrolix/log"
        "github.com/anacrolix/torrent/tracker/udp"
 )
 
@@ -25,11 +29,201 @@ type AnnounceTracker interface {
        GetPeers(ctx context.Context, infoHash InfoHash, opts GetPeersOpts) ([]PeerInfo, error)
 }
 
-//
-//type Server struct {
-//     AnnounceTracker AnnounceTracker
-//}
-//
-//func (me Server) HandleAnnounce(req udp.AnnounceRequest, sourceAddr AnnounceAddr) error {
-//
-//}
+type AnnounceHandler struct {
+       AnnounceTracker        AnnounceTracker
+       UpstreamTrackers       []Client
+       UpstreamTrackerUrls    []string
+       UpstreamAnnouncePeerId [20]byte
+
+       mu sync.Mutex
+       // Operations are only removed when all the upstream peers have been tracked.
+       ongoingUpstreamAugmentations map[InfoHash]augmentationOperation
+}
+
+type peerSet = map[PeerInfo]struct{}
+
+type augmentationOperation struct {
+       // Closed when no more announce responses are pending. finalPeers will contain all the peers
+       // seen.
+       doneAnnouncing chan struct{}
+       // This receives the latest peerSet until doneAnnouncing is closed.
+       curPeers chan peerSet
+       // This contains the final peerSet after doneAnnouncing is closed.
+       finalPeers peerSet
+}
+
+func (me augmentationOperation) getCurPeers() (ret peerSet) {
+       ret, _ = me.getCurPeersAndDone()
+       return
+}
+
+func (me augmentationOperation) getCurPeersAndDone() (ret peerSet, done bool) {
+       select {
+       case ret = <-me.curPeers:
+       case <-me.doneAnnouncing:
+               ret = me.finalPeers
+               done = true
+       }
+       return
+}
+
+// Adds peers from new that aren't in orig. Modifies both arguments.
+func addMissing(orig []PeerInfo, new peerSet) {
+       for _, peer := range orig {
+               delete(new, peer)
+       }
+       for peer := range new {
+               orig = append(orig, peer)
+       }
+}
+
+func (me *AnnounceHandler) Serve(
+       ctx context.Context, req AnnounceRequest, addr AnnounceAddr,
+) (peers []PeerInfo, err error) {
+       err = me.AnnounceTracker.TrackAnnounce(ctx, req, addr)
+       if err != nil {
+               return
+       }
+       infoHash := req.InfoHash
+       var op generics.Option[augmentationOperation]
+       // Grab a handle to any augmentations that are already running.
+       me.mu.Lock()
+       op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
+       me.mu.Unlock()
+       peers, err = me.AnnounceTracker.GetPeers(ctx, infoHash, GetPeersOpts{})
+       if err != nil {
+               return
+       }
+       // Take whatever peers it has ready. If it's finished, it doesn't matter if we do this inside
+       // the mutex or not.
+       if op.Ok {
+               curPeers, done := op.Value.getCurPeersAndDone()
+               addMissing(peers, curPeers)
+               if done {
+                       // It doesn't get any better with this operation. Forget it.
+                       op.Ok = false
+               }
+       }
+       me.mu.Lock()
+       // If we didn't have an operation, and don't have enough peers, start one.
+       if !op.Ok && len(peers) <= 1 {
+               op.Value, op.Ok = me.ongoingUpstreamAugmentations[infoHash]
+               if !op.Ok {
+                       op.Set(me.augmentPeersFromUpstream(req))
+                       generics.MakeMapIfNilAndSet(&me.ongoingUpstreamAugmentations, infoHash, op.Value)
+               }
+       }
+       me.mu.Unlock()
+       // Wait a while for the current operation.
+       if op.Ok {
+               // Force the augmentation to return with whatever it has if it hasn't completed in a
+               // reasonable time.
+               ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+               select {
+               case <-ctx.Done():
+               case <-op.Value.doneAnnouncing:
+               }
+               cancel()
+               addMissing(peers, op.Value.getCurPeers())
+       }
+       return
+}
+
+func (me *AnnounceHandler) augmentPeersFromUpstream(req AnnounceRequest) augmentationOperation {
+       announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
+       subReq := AnnounceRequest{
+               InfoHash: req.InfoHash,
+               PeerId:   me.UpstreamAnnouncePeerId,
+               Event:    None,
+               Key:      0,
+               NumWant:  -1,
+               Port:     0,
+       }
+       peersChan := make(chan []Peer)
+       var pendingUpstreams sync.WaitGroup
+       for i := range me.UpstreamTrackers {
+               client := me.UpstreamTrackers[i]
+               url := me.UpstreamTrackerUrls[i]
+               pendingUpstreams.Add(1)
+               go func() {
+                       resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
+                               UserAgent: "aragorn",
+                       })
+                       peersChan <- resp.Peers
+                       if err != nil {
+                               log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
+                       }
+               }()
+       }
+       peersToTrack := make(map[string]Peer)
+       go func() {
+               pendingUpstreams.Wait()
+               cancel()
+               close(peersChan)
+               log.Levelf(log.Debug, "adding %v distinct peers from upstream trackers")
+               for _, peer := range peersToTrack {
+                       addrPort, ok := peer.ToNetipAddrPort()
+                       if !ok {
+                               continue
+                       }
+                       trackReq := AnnounceRequest{
+                               InfoHash: req.InfoHash,
+                               Event:    Started,
+                               Port:     uint16(peer.Port),
+                       }
+                       copy(trackReq.PeerId[:], peer.ID)
+                       err := me.AnnounceTracker.TrackAnnounce(context.TODO(), trackReq, addrPort)
+                       if err != nil {
+                               log.Levelf(log.Error, "error tracking upstream peer: %v", err)
+                       }
+               }
+               me.mu.Lock()
+               delete(me.ongoingUpstreamAugmentations, req.InfoHash)
+               me.mu.Unlock()
+       }()
+       curPeersChan := make(chan map[PeerInfo]struct{})
+       doneChan := make(chan struct{})
+       retPeers := make(map[PeerInfo]struct{})
+       go func() {
+               for {
+                       select {
+                       case peers, ok := <-peersChan:
+                               if !ok {
+                                       return
+                               }
+                               voldemort(peers, peersToTrack, retPeers)
+                               pendingUpstreams.Done()
+                       case curPeersChan <- copyPeerSet(retPeers):
+                       }
+               }
+       }()
+       // Take return references.
+       return augmentationOperation{
+               curPeers:       curPeersChan,
+               finalPeers:     retPeers,
+               doneAnnouncing: doneChan,
+       }
+}
+
+func copyPeerSet(orig peerSet) (ret peerSet) {
+       ret = make(peerSet, len(orig))
+       for k, v := range orig {
+               ret[k] = v
+       }
+       return
+}
+
+// Adds peers to trailing containers.
+func voldemort(peers []Peer, toTrack map[string]Peer, sets ...map[PeerInfo]struct{}) {
+       for _, protoPeer := range peers {
+               toTrack[protoPeer.String()] = protoPeer
+               addr, ok := netip.AddrFromSlice(protoPeer.IP)
+               if !ok {
+                       continue
+               }
+               handlerPeer := PeerInfo{netip.AddrPortFrom(addr, uint16(protoPeer.Port))}
+               for _, set := range sets {
+                       set[handlerPeer] = struct{}{}
+               }
+       }
+}
index 68abb1061aca1b8b9a6e882bf74057b8dd8ac723..95e7a5e7a44d5bc8fe03e8838757578d251a1d45 100644 (file)
@@ -29,9 +29,9 @@ type InfoHash = [20]byte
 type AnnounceTracker = tracker.AnnounceTracker
 
 type Server struct {
-       ConnTracker     ConnectionTracker
-       SendResponse    func(data []byte, addr net.Addr) (int, error)
-       AnnounceTracker AnnounceTracker
+       ConnTracker  ConnectionTracker
+       SendResponse func(data []byte, addr net.Addr) (int, error)
+       Announce     tracker.AnnounceHandler
 }
 
 type RequestSourceAddr = net.Addr
@@ -72,6 +72,9 @@ func (me *Server) handleAnnounce(
        tid udp.TransactionId,
        r *bytes.Reader,
 ) error {
+       // Should we set a timeout of 10s or something for the entire response, so that we give up if a
+       // retry is imminent?
+
        ok, err := me.ConnTracker.Check(ctx, source.String(), connId)
        if err != nil {
                err = fmt.Errorf("checking conn id: %w", err)
@@ -91,11 +94,7 @@ func (me *Server) handleAnnounce(
                err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err)
                return err
        }
-       err = me.AnnounceTracker.TrackAnnounce(ctx, req, announceAddr)
-       if err != nil {
-               return err
-       }
-       peers, err := me.AnnounceTracker.GetPeers(ctx, req.InfoHash, tracker.GetPeersOpts{})
+       peers, err := me.Announce.Serve(ctx, req, announceAddr)
        if err != nil {
                return err
        }