From ddd03362f8285cf7859d60178eb07ab56aa1d329 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 17 Dec 2020 12:22:02 +1100 Subject: [PATCH] Factor out internal/string_limiter --- client.go | 52 ++----------------- internal/string-limiter/string-limiter.go | 62 +++++++++++++++++++++++ tracker_scraper.go | 2 +- 3 files changed, 66 insertions(+), 50 deletions(-) create mode 100644 internal/string-limiter/string-limiter.go diff --git a/client.go b/client.go index 93ab3857..1379b692 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,7 @@ import ( "github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" + "github.com/anacrolix/torrent/internal/string-limiter" "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/webtorrent" "github.com/davecgh/go-spew/spew" @@ -79,55 +80,7 @@ type Client struct { websocketTrackers websocketTrackers - activeAnnouncesMu sync.Mutex - // Limits concurrent use of a trackers by URL. Push into the channel to use a slot, and receive - // to free up a slot. - activeAnnounces map[string]*activeAnnouncesValueType -} - -type activeAnnouncesValueType struct { - ch chan struct{} - refs int -} - -type activeAnnouncesValueRef struct { - r *activeAnnouncesValueType - url string - cl *Client -} - -func (me activeAnnouncesValueRef) C() chan struct{} { - return me.r.ch -} - -func (me activeAnnouncesValueRef) Drop() { - me.cl.activeAnnouncesMu.Lock() - defer me.cl.activeAnnouncesMu.Unlock() - me.r.refs-- - if me.r.refs == 0 { - delete(me.cl.activeAnnounces, me.url) - } -} - -func (cl *Client) getAnnounceRef(url string) activeAnnouncesValueRef { - cl.activeAnnouncesMu.Lock() - defer cl.activeAnnouncesMu.Unlock() - if cl.activeAnnounces == nil { - cl.activeAnnounces = make(map[string]*activeAnnouncesValueType) - } - v, ok := cl.activeAnnounces[url] - if !ok { - v = &activeAnnouncesValueType{ - ch: make(chan struct{}, 2), - } - cl.activeAnnounces[url] = v - } - v.refs++ - return activeAnnouncesValueRef{ - r: v, - url: url, - cl: cl, - } + activeAnnounceLimiter string_limiter.Instance } type ipStr string @@ -234,6 +187,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { torrents: make(map[metainfo.Hash]*Torrent), dialRateLimiter: rate.NewLimiter(10, 10), } + cl.activeAnnounceLimiter.SlotsPerKey = 2 go cl.acceptLimitClearer() cl.initLogger() defer func() { diff --git a/internal/string-limiter/string-limiter.go b/internal/string-limiter/string-limiter.go new file mode 100644 index 00000000..8a8f91dc --- /dev/null +++ b/internal/string-limiter/string-limiter.go @@ -0,0 +1,62 @@ +package string_limiter + +import "sync" + +// Manages resources with a limited number of concurrent slots for use keyed by a string. +type Instance struct { + SlotsPerKey int + + mu sync.Mutex + // Limits concurrent use of a resource. Push into the channel to use a slot, and receive to free + // up a slot. + active map[string]*activeValueType +} + +type activeValueType struct { + ch chan struct{} + refs int +} + +type ActiveValueRef struct { + v *activeValueType + k string + i *Instance +} + +// Returns the limiting channel. Send to it to obtain a slot, and receive to release the slot. +func (me ActiveValueRef) C() chan struct{} { + return me.v.ch +} + +// Drop the reference to a key, this allows keys to be reclaimed when they're no longer in use. +func (me ActiveValueRef) Drop() { + me.i.mu.Lock() + defer me.i.mu.Unlock() + me.v.refs-- + if me.v.refs == 0 { + delete(me.i.active, me.k) + } +} + +// Get a reference to the values for a key. You should make sure to call Drop exactly once on the +// returned value when done. +func (i *Instance) GetRef(key string) ActiveValueRef { + i.mu.Lock() + defer i.mu.Unlock() + if i.active == nil { + i.active = make(map[string]*activeValueType) + } + v, ok := i.active[key] + if !ok { + v = &activeValueType{ + ch: make(chan struct{}, i.SlotsPerKey), + } + i.active[key] = v + } + v.refs++ + return ActiveValueRef{ + v: v, + k: key, + i: i, + } +} diff --git a/tracker_scraper.go b/tracker_scraper.go index 8db47b0e..c18f21f8 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -112,7 +112,7 @@ func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEv ret.Interval = time.Minute // Limit concurrent use of the same tracker URL by the Client. - ref := me.t.cl.getAnnounceRef(me.u.String()) + ref := me.t.cl.activeAnnounceLimiter.GetRef(me.u.String()) defer ref.Drop() select { case <-ctx.Done(): -- 2.44.0