From cbea87aaf3f9c7ad5d63bb32eb7e71d743b0aaf5 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 13 Dec 2022 16:41:08 +1100 Subject: [PATCH] Add upstream announce gating --- tracker/server.go | 28 ++++++++++++++++++++++++++-- tracker/upstream-announcing.go | 18 ++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 tracker/upstream-announcing.go diff --git a/tracker/server.go b/tracker/server.go index 0d5068d4..c74cbd24 100644 --- a/tracker/server.go +++ b/tracker/server.go @@ -38,10 +38,12 @@ type AnnounceTracker interface { } type AnnounceHandler struct { - AnnounceTracker AnnounceTracker + AnnounceTracker AnnounceTracker + UpstreamTrackers []Client UpstreamTrackerUrls []string UpstreamAnnouncePeerId [20]byte + UpstreamAnnounceGate UpstreamAnnounceGater mu sync.Mutex // Operations are only removed when all the upstream peers have been tracked. @@ -174,7 +176,8 @@ func (me *AnnounceHandler) Serve( } func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation { - announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + const announceTimeout = time.Minute + announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout) subReq := AnnounceRequest{ InfoHash: infoHash, PeerId: me.UpstreamAnnouncePeerId, @@ -190,9 +193,30 @@ func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentat url := me.UpstreamTrackerUrls[i] pendingUpstreams.Add(1) go func() { + started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout) + if err != nil { + log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err) + } + if err != nil || !started { + peersChan <- nil + return + } + log.Printf("announcing %x upstream to %v", infoHash, url) resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{ UserAgent: "aragorn", }) + interval := resp.Interval + go func() { + if interval < 5*60 { + // This is as much to reduce load on upstream trackers in the event of errors, + // as it is to reduce load on our peer store. + interval = 5 * 60 + } + err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval) + if err != nil { + log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err) + } + }() peersChan <- resp.Peers if err != nil { log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err) diff --git a/tracker/upstream-announcing.go b/tracker/upstream-announcing.go new file mode 100644 index 00000000..ab5a5fb3 --- /dev/null +++ b/tracker/upstream-announcing.go @@ -0,0 +1,18 @@ +package tracker + +import ( + "context" + "time" +) + +type UpstreamAnnounceGater interface { + Start(ctx context.Context, tracker string, infoHash InfoHash, + // How long the announce block remains before discarding it. + timeout time.Duration, + ) (bool, error) + Completed( + ctx context.Context, tracker string, infoHash InfoHash, + // Num of seconds reported by tracker, or some suitable value the caller has chosen. + interval int32, + ) error +} -- 2.48.1