From: Matt Joiner Date: Tue, 13 Dec 2022 05:41:08 +0000 (+1100) Subject: Add upstream announce gating X-Git-Tag: v1.49.0~7^2~17 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=cbea87aaf3f9c7ad5d63bb32eb7e71d743b0aaf5;p=btrtrc.git Add upstream announce gating --- diff --git a/tracker/server.go b/tracker/server.go index 0d5068d4..c74cbd24 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -38,10 +38,12 @@ type AnnounceTracker interface { } type AnnounceHandler struct { - AnnounceTracker AnnounceTracker + AnnounceTracker AnnounceTracker + UpstreamTrackers []Client UpstreamTrackerUrls []string UpstreamAnnouncePeerId [20]byte + UpstreamAnnounceGate UpstreamAnnounceGater mu sync.Mutex // Operations are only removed when all the upstream peers have been tracked. @@ -174,7 +176,8 @@ func (me *AnnounceHandler) Serve( } func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation { - announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + const announceTimeout = time.Minute + announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout) subReq := AnnounceRequest{ InfoHash: infoHash, PeerId: me.UpstreamAnnouncePeerId, @@ -190,9 +193,30 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentat url := me.UpstreamTrackerUrls[i] pendingUpstreams.Add(1) go func() { + started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout) + if err != nil { + log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err) + } + if err != nil || !started { + peersChan <- nil + return + } + log.Printf("announcing %x upstream to %v", infoHash, url) resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{ UserAgent: "aragorn", }) + interval := resp.Interval + go func() { + if interval < 5*60 { + // This is as much to reduce load on upstream trackers in the event of errors, + // as it is to reduce load on our peer store. + interval = 5 * 60 + } + err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval) + if err != nil { + log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err) + } + }() peersChan <- resp.Peers if err != nil { log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err) diff --git a/tracker/upstream-announcing.go b/tracker/upstream-announcing.go new file mode 100644 index 00000000..ab5a5fb3 --- /dev/null +++ b/tracker/upstream-announcing.go @@ -0,0 +1,18 @@ +package tracker + +import ( + "context" + "time" +) + +type UpstreamAnnounceGater interface { + Start(ctx context.Context, tracker string, infoHash InfoHash, + // How long the announce block remains before discarding it. + timeout time.Duration, + ) (bool, error) + Completed( + ctx context.Context, tracker string, infoHash InfoHash, + // Num of seconds reported by tracker, or some suitable value the caller has chosen. + interval int32, + ) error +}