]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add upstream announce gating
authorMatt Joiner <anacrolix@gmail.com>
Tue, 13 Dec 2022 05:41:08 +0000 (16:41 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 13 Dec 2022 05:41:08 +0000 (16:41 +1100)
tracker/server.go
tracker/upstream-announcing.go [new file with mode: 0644]

index 0d5068d48bf2ffd19f08fd61c89039e8823acb88..c74cbd245f3c7617b404fd8ae5c197def27168bd 100644 (file)
@@ -38,10 +38,12 @@ type AnnounceTracker interface {
 }
 
 type AnnounceHandler struct {
-       AnnounceTracker        AnnounceTracker
+       AnnounceTracker AnnounceTracker
+
        UpstreamTrackers       []Client
        UpstreamTrackerUrls    []string
        UpstreamAnnouncePeerId [20]byte
+       UpstreamAnnounceGate   UpstreamAnnounceGater
 
        mu sync.Mutex
        // Operations are only removed when all the upstream peers have been tracked.
@@ -174,7 +176,8 @@ func (me *AnnounceHandler) Serve(
 }
 
 func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
-       announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
+       const announceTimeout = time.Minute
+       announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
        subReq := AnnounceRequest{
                InfoHash: infoHash,
                PeerId:   me.UpstreamAnnouncePeerId,
@@ -190,9 +193,30 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentat
                url := me.UpstreamTrackerUrls[i]
                pendingUpstreams.Add(1)
                go func() {
+                       started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
+                       if err != nil {
+                               log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
+                       }
+                       if err != nil || !started {
+                               peersChan <- nil
+                               return
+                       }
+                       log.Printf("announcing %x upstream to %v", infoHash, url)
                        resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
                                UserAgent: "aragorn",
                        })
+                       interval := resp.Interval
+                       go func() {
+                               if interval < 5*60 {
+                                       // This is as much to reduce load on upstream trackers in the event of errors,
+                                       // as it is to reduce load on our peer store.
+                                       interval = 5 * 60
+                               }
+                               err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
+                               if err != nil {
+                                       log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
+                               }
+                       }()
                        peersChan <- resp.Peers
                        if err != nil {
                                log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
diff --git a/tracker/upstream-announcing.go b/tracker/upstream-announcing.go
new file mode 100644 (file)
index 0000000..ab5a5fb
--- /dev/null
@@ -0,0 +1,18 @@
+package tracker
+
+import (
+       "context"
+       "time"
+)
+
+type UpstreamAnnounceGater interface {
+       Start(ctx context.Context, tracker string, infoHash InfoHash,
+       // How long the announce block remains before discarding it.
+               timeout time.Duration,
+       ) (bool, error)
+       Completed(
+               ctx context.Context, tracker string, infoHash InfoHash,
+       // Num of seconds reported by tracker, or some suitable value the caller has chosen.
+               interval int32,
+       ) error
+}