}
type AnnounceHandler struct {
- AnnounceTracker AnnounceTracker
+ AnnounceTracker AnnounceTracker
+
UpstreamTrackers []Client
UpstreamTrackerUrls []string
UpstreamAnnouncePeerId [20]byte
+ UpstreamAnnounceGate UpstreamAnnounceGater
mu sync.Mutex
// Operations are only removed when all the upstream peers have been tracked.
}
func (me *AnnounceHandler) augmentPeersFromUpstream(infoHash [20]byte) augmentationOperation {
- announceCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
+ const announceTimeout = time.Minute
+ announceCtx, cancel := context.WithTimeout(context.Background(), announceTimeout)
subReq := AnnounceRequest{
InfoHash: infoHash,
PeerId: me.UpstreamAnnouncePeerId,
url := me.UpstreamTrackerUrls[i]
pendingUpstreams.Add(1)
go func() {
+ started, err := me.UpstreamAnnounceGate.Start(announceCtx, url, infoHash, announceTimeout)
+ if err != nil {
+ log.Printf("error reserving announce for %x to %v: %v", infoHash, url, err)
+ }
+ if err != nil || !started {
+ peersChan <- nil
+ return
+ }
+ log.Printf("announcing %x upstream to %v", infoHash, url)
resp, err := client.Announce(announceCtx, subReq, AnnounceOpt{
UserAgent: "aragorn",
})
+ interval := resp.Interval
+ go func() {
+ if interval < 5*60 {
+ // This is as much to reduce load on upstream trackers in the event of errors,
+ // as it is to reduce load on our peer store.
+ interval = 5 * 60
+ }
+ err := me.UpstreamAnnounceGate.Completed(context.Background(), url, infoHash, interval)
+ if err != nil {
+ log.Printf("error recording completed announce for %x to %v: %v", infoHash, url, err)
+ }
+ }()
peersChan <- resp.Peers
if err != nil {
log.Levelf(log.Warning, "error announcing to upstream %q: %v", url, err)
--- /dev/null
+package tracker
+
+import (
+ "context"
+ "time"
+)
+
+type UpstreamAnnounceGater interface {
+ Start(ctx context.Context, tracker string, infoHash InfoHash,
+ // How long the announce block remains before discarding it.
+ timeout time.Duration,
+ ) (bool, error)
+ Completed(
+ ctx context.Context, tracker string, infoHash InfoHash,
+ // Num of seconds reported by tracker, or some suitable value the caller has chosen.
+ interval int32,
+ ) error
+}