]> Sergey Matveev's repositories - btrtrc.git/blobdiff - tracker_scraper.go
Rework active announce limiter to reduce contention
[btrtrc.git] / tracker_scraper.go
index 65673539524f9d9bea67ae204a9f78f9ff2da853..8db47b0eb1563a8da90bc9b0354eba416be7d243 100644 (file)
@@ -2,6 +2,7 @@ package torrent
 
 import (
        "bytes"
+       "context"
        "errors"
        "fmt"
        "net"
@@ -20,10 +21,6 @@ type trackerScraper struct {
        u            url.URL
        t            *Torrent
        lastAnnounce trackerAnnounceResult
-       allow        func()
-       // The slowdown argument lets us indicate if we think there should be some backpressure on
-       // access to the tracker. It doesn't necessarily have to be used.
-       done func(slowdown bool)
 }
 
 type torrentTrackerAnnouncer interface {
@@ -107,16 +104,30 @@ func (me *trackerScraper) trackerUrl(ip net.IP) string {
 
 // Return how long to wait before trying again. For most errors, we return 5
 // minutes, a relatively quick turn around for DNS changes.
-func (me *trackerScraper) announce(event tracker.AnnounceEvent) (ret trackerAnnounceResult) {
+func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) {
+
        defer func() {
                ret.Completed = time.Now()
        }()
        ret.Interval = time.Minute
-       me.allow()
-       // We might pass true if we got an error. Currently we don't because timing out with a
-       // reasonably long timeout is its own form of backpressure (it remains to be seen if it's
-       // enough).
-       defer me.done(false)
+
+       // Limit concurrent use of the same tracker URL by the Client.
+       ref := me.t.cl.getAnnounceRef(me.u.String())
+       defer ref.Drop()
+       select {
+       case <-ctx.Done():
+               ret.Err = ctx.Err()
+               return
+       case ref.C() <- struct{}{}:
+       }
+       defer func() {
+               select {
+               case <-ref.C():
+               default:
+                       panic("should return immediately")
+               }
+       }()
+
        ip, err := me.getIp()
        if err != nil {
                ret.Err = fmt.Errorf("error getting ip: %s", err)
@@ -170,11 +181,24 @@ func (me *trackerScraper) canIgnoreInterval(notify *<-chan struct{}) bool {
 }
 
 func (me *trackerScraper) Run() {
+
        defer me.announceStopped()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       go func() {
+               defer cancel()
+               select {
+               case <-ctx.Done():
+               case <-me.t.Closed():
+               }
+       }()
+
        // make sure first announce is a "started"
        e := tracker.Started
+
        for {
-               ar := me.announce(e)
+               ar := me.announce(ctx, e)
                // after first announce, get back to regular "none"
                e = tracker.None
                me.t.cl.lock()
@@ -219,5 +243,7 @@ func (me *trackerScraper) Run() {
 }
 
 func (me *trackerScraper) announceStopped() {
-       me.announce(tracker.Stopped)
+       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+       defer cancel()
+       me.announce(ctx, tracker.Stopped)
 }