From 60e992ec97e64bb08ce5fb80f4067722c8c915ad Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 29 Jan 2021 16:01:35 +1100 Subject: [PATCH] Rework webseed peers to use a pool of requesters This fixes the limitation that the max outstanding requests for a webseed peer must match the request concurrency. It should mean less recalculation, and more pipelining. --- torrent.go | 12 ++++++++---- webseed-peer.go | 48 +++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/torrent.go b/torrent.go index 51921c12..26f49b82 100644 --- a/torrent.go +++ b/torrent.go @@ -26,6 +26,7 @@ import ( "github.com/anacrolix/dht/v2" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" @@ -2118,17 +2119,20 @@ func (t *Torrent) addWebSeed(url string) { reconciledHandshakeStats: true, peerSentHaveAll: true, // TODO: Raise this limit, and instead limit concurrent fetches. - PeerMaxRequests: maxRequests, + PeerMaxRequests: 32, RemoteAddr: remoteAddrFromUrl(url), callbacks: t.callbacks(), }, client: webseed.Client{ - // TODO: Investigate a MaxConnsPerHost in the transport for this, possibly in a global - // Client. + // Consider a MaxConnsPerHost in the transport for this, possibly in a global Client. HttpClient: http.DefaultClient, Url: url, }, - requests: make(map[Request]webseed.Request, maxRequests), + activeRequests: make(map[Request]webseed.Request, maxRequests), + } + ws.requesterCond.L = t.cl.locker() + for range iter.N(maxRequests) { + go ws.requester() } for _, f := range t.callbacks().NewPeer { f(&ws.peer) diff --git a/webseed-peer.go b/webseed-peer.go index 0e4676f5..72c9173d 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -3,6 +3,7 @@ package torrent import ( "fmt" "strings" + "sync" "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" @@ -13,10 +14,10 @@ import ( ) type webseedPeer struct { - client webseed.Client - // TODO: Remove finished entries from this. - requests map[Request]webseed.Request - peer Peer + client webseed.Client + activeRequests map[Request]webseed.Request + requesterCond sync.Cond + peer Peer } var _ peerImpl = (*webseedPeer)(nil) @@ -43,7 +44,11 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { } func (ws *webseedPeer) cancel(r Request) bool { - ws.requests[r].Cancel() + active, ok := ws.activeRequests[r] + if !ok { + return false + } + active.Cancel() return true } @@ -52,12 +57,35 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { } func (ws *webseedPeer) request(r Request) bool { - webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) - ws.requests[r] = webseedRequest - go ws.requestResultHandler(r, webseedRequest) + ws.requesterCond.Signal() return true } +func (ws *webseedPeer) doRequest(r Request) { + webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) + ws.activeRequests[r] = webseedRequest + ws.requesterCond.L.Unlock() + ws.requestResultHandler(r, webseedRequest) + ws.requesterCond.L.Lock() + delete(ws.activeRequests, r) +} + +func (ws *webseedPeer) requester() { + ws.requesterCond.L.Lock() + defer ws.requesterCond.L.Unlock() +start: + for !ws.peer.closed.IsSet() { + for r := range ws.peer.requests { + if _, ok := ws.activeRequests[r]; ok { + continue + } + ws.doRequest(r) + goto start + } + ws.requesterCond.Wait() + } +} + func (ws *webseedPeer) connectionFlags() string { return "WS" } @@ -70,7 +98,9 @@ func (ws *webseedPeer) updateRequests() { ws.peer.doRequestState() } -func (ws *webseedPeer) onClose() {} +func (ws *webseedPeer) onClose() { + ws.requesterCond.Broadcast() +} func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) { result := <-webseedRequest.Result -- 2.44.0