]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Factor out internal/string_limiter v1.19.0
authorMatt Joiner <anacrolix@gmail.com>
Thu, 17 Dec 2020 01:22:02 +0000 (12:22 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 17 Dec 2020 01:22:02 +0000 (12:22 +1100)
client.go
internal/string-limiter/string-limiter.go [new file with mode: 0644]
tracker_scraper.go

index 93ab385715688823ccfbbd1e74744f4ed733b72a..1379b6929a03c8f5f983f1f279c040b43908147c 100644 (file)
--- a/client.go
+++ b/client.go
@@ -24,6 +24,7 @@ import (
        "github.com/anacrolix/missinggo/slices"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
+       "github.com/anacrolix/torrent/internal/string-limiter"
        "github.com/anacrolix/torrent/tracker"
        "github.com/anacrolix/torrent/webtorrent"
        "github.com/davecgh/go-spew/spew"
@@ -79,55 +80,7 @@ type Client struct {
 
        websocketTrackers websocketTrackers
 
-       activeAnnouncesMu sync.Mutex
-       // Limits concurrent use of a trackers by URL. Push into the channel to use a slot, and receive
-       // to free up a slot.
-       activeAnnounces map[string]*activeAnnouncesValueType
-}
-
-type activeAnnouncesValueType struct {
-       ch   chan struct{}
-       refs int
-}
-
-type activeAnnouncesValueRef struct {
-       r   *activeAnnouncesValueType
-       url string
-       cl  *Client
-}
-
-func (me activeAnnouncesValueRef) C() chan struct{} {
-       return me.r.ch
-}
-
-func (me activeAnnouncesValueRef) Drop() {
-       me.cl.activeAnnouncesMu.Lock()
-       defer me.cl.activeAnnouncesMu.Unlock()
-       me.r.refs--
-       if me.r.refs == 0 {
-               delete(me.cl.activeAnnounces, me.url)
-       }
-}
-
-func (cl *Client) getAnnounceRef(url string) activeAnnouncesValueRef {
-       cl.activeAnnouncesMu.Lock()
-       defer cl.activeAnnouncesMu.Unlock()
-       if cl.activeAnnounces == nil {
-               cl.activeAnnounces = make(map[string]*activeAnnouncesValueType)
-       }
-       v, ok := cl.activeAnnounces[url]
-       if !ok {
-               v = &activeAnnouncesValueType{
-                       ch: make(chan struct{}, 2),
-               }
-               cl.activeAnnounces[url] = v
-       }
-       v.refs++
-       return activeAnnouncesValueRef{
-               r:   v,
-               url: url,
-               cl:  cl,
-       }
+       activeAnnounceLimiter string_limiter.Instance
 }
 
 type ipStr string
@@ -234,6 +187,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                torrents:          make(map[metainfo.Hash]*Torrent),
                dialRateLimiter:   rate.NewLimiter(10, 10),
        }
+       cl.activeAnnounceLimiter.SlotsPerKey = 2
        go cl.acceptLimitClearer()
        cl.initLogger()
        defer func() {
diff --git a/internal/string-limiter/string-limiter.go b/internal/string-limiter/string-limiter.go
new file mode 100644 (file)
index 0000000..8a8f91d
--- /dev/null
@@ -0,0 +1,62 @@
+package string_limiter
+
+import "sync"
+
+// Manages resources with a limited number of concurrent slots for use keyed by a string.
+type Instance struct {
+       SlotsPerKey int
+
+       mu sync.Mutex
+       // Limits concurrent use of a resource. Push into the channel to use a slot, and receive to free
+       // up a slot.
+       active map[string]*activeValueType
+}
+
+type activeValueType struct {
+       ch   chan struct{}
+       refs int
+}
+
+type ActiveValueRef struct {
+       v *activeValueType
+       k string
+       i *Instance
+}
+
+// Returns the limiting channel. Send to it to obtain a slot, and receive to release the slot.
+func (me ActiveValueRef) C() chan struct{} {
+       return me.v.ch
+}
+
+// Drop the reference to a key, this allows keys to be reclaimed when they're no longer in use.
+func (me ActiveValueRef) Drop() {
+       me.i.mu.Lock()
+       defer me.i.mu.Unlock()
+       me.v.refs--
+       if me.v.refs == 0 {
+               delete(me.i.active, me.k)
+       }
+}
+
+// Get a reference to the values for a key. You should make sure to call Drop exactly once on the
+// returned value when done.
+func (i *Instance) GetRef(key string) ActiveValueRef {
+       i.mu.Lock()
+       defer i.mu.Unlock()
+       if i.active == nil {
+               i.active = make(map[string]*activeValueType)
+       }
+       v, ok := i.active[key]
+       if !ok {
+               v = &activeValueType{
+                       ch: make(chan struct{}, i.SlotsPerKey),
+               }
+               i.active[key] = v
+       }
+       v.refs++
+       return ActiveValueRef{
+               v: v,
+               k: key,
+               i: i,
+       }
+}
index 8db47b0eb1563a8da90bc9b0354eba416be7d243..c18f21f858434d36b04e5bc0f05a5360074b34d6 100644 (file)
@@ -112,7 +112,7 @@ func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEv
        ret.Interval = time.Minute
 
        // Limit concurrent use of the same tracker URL by the Client.
-       ref := me.t.cl.getAnnounceRef(me.u.String())
+       ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String())
        defer ref.Drop()
        select {
        case <-ctx.Done():