]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework webseed peers to use a pool of requesters
authorMatt Joiner <anacrolix@gmail.com>
Fri, 29 Jan 2021 05:01:35 +0000 (16:01 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 29 Jan 2021 05:01:35 +0000 (16:01 +1100)
This fixes the limitation that the max outstanding requests for a webseed peer must match the request concurrency. It should mean less recalculation, and more pipelining.

torrent.go
webseed-peer.go

index 51921c12052f2e0f268e5df833fd2c657971f54b..26f49b82a6b289a08e9526d015b5d974db90f902 100644 (file)
@@ -26,6 +26,7 @@ import (
        "github.com/anacrolix/dht/v2"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/perf"
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/missinggo/slices"
@@ -2118,17 +2119,20 @@ func (t *Torrent) addWebSeed(url string) {
                        reconciledHandshakeStats: true,
                        peerSentHaveAll:          true,
                        // TODO: Raise this limit, and instead limit concurrent fetches.
-                       PeerMaxRequests: maxRequests,
+                       PeerMaxRequests: 32,
                        RemoteAddr:      remoteAddrFromUrl(url),
                        callbacks:       t.callbacks(),
                },
                client: webseed.Client{
-                       // TODO: Investigate a MaxConnsPerHost in the transport for this, possibly in a global
-                       // Client.
+                       // Consider a MaxConnsPerHost in the transport for this, possibly in a global Client.
                        HttpClient: http.DefaultClient,
                        Url:        url,
                },
-               requests: make(map[Request]webseed.Request, maxRequests),
+               activeRequests: make(map[Request]webseed.Request, maxRequests),
+       }
+       ws.requesterCond.L = t.cl.locker()
+       for range iter.N(maxRequests) {
+               go ws.requester()
        }
        for _, f := range t.callbacks().NewPeer {
                f(&ws.peer)
index 0e4676f57a2072d26e5e575cbaba73a8e14a07a4..72c9173d44f7448a56303500cfc4950270e6b655 100644 (file)
@@ -3,6 +3,7 @@ package torrent
 import (
        "fmt"
        "strings"
+       "sync"
 
        "github.com/anacrolix/torrent/common"
        "github.com/anacrolix/torrent/metainfo"
@@ -13,10 +14,10 @@ import (
 )
 
 type webseedPeer struct {
-       client webseed.Client
-       // TODO: Remove finished entries from this.
-       requests map[Request]webseed.Request
-       peer     Peer
+       client         webseed.Client
+       activeRequests map[Request]webseed.Request
+       requesterCond  sync.Cond
+       peer           Peer
 }
 
 var _ peerImpl = (*webseedPeer)(nil)
@@ -43,7 +44,11 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
 }
 
 func (ws *webseedPeer) cancel(r Request) bool {
-       ws.requests[r].Cancel()
+       active, ok := ws.activeRequests[r]
+       if !ok {
+               return false
+       }
+       active.Cancel()
        return true
 }
 
@@ -52,12 +57,35 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
 }
 
 func (ws *webseedPeer) request(r Request) bool {
-       webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
-       ws.requests[r] = webseedRequest
-       go ws.requestResultHandler(r, webseedRequest)
+       ws.requesterCond.Signal()
        return true
 }
 
+func (ws *webseedPeer) doRequest(r Request) {
+       webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+       ws.activeRequests[r] = webseedRequest
+       ws.requesterCond.L.Unlock()
+       ws.requestResultHandler(r, webseedRequest)
+       ws.requesterCond.L.Lock()
+       delete(ws.activeRequests, r)
+}
+
+func (ws *webseedPeer) requester() {
+       ws.requesterCond.L.Lock()
+       defer ws.requesterCond.L.Unlock()
+start:
+       for !ws.peer.closed.IsSet() {
+               for r := range ws.peer.requests {
+                       if _, ok := ws.activeRequests[r]; ok {
+                               continue
+                       }
+                       ws.doRequest(r)
+                       goto start
+               }
+               ws.requesterCond.Wait()
+       }
+}
+
 func (ws *webseedPeer) connectionFlags() string {
        return "WS"
 }
@@ -70,7 +98,9 @@ func (ws *webseedPeer) updateRequests() {
        ws.peer.doRequestState()
 }
 
-func (ws *webseedPeer) onClose() {}
+func (ws *webseedPeer) onClose() {
+       ws.requesterCond.Broadcast()
+}
 
 func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
        result := <-webseedRequest.Result