"github.com/anacrolix/dht/v2"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices"
reconciledHandshakeStats: true,
peerSentHaveAll: true,
// TODO: Raise this limit, and instead limit concurrent fetches.
- PeerMaxRequests: maxRequests,
+ PeerMaxRequests: 32,
RemoteAddr: remoteAddrFromUrl(url),
callbacks: t.callbacks(),
},
client: webseed.Client{
- // TODO: Investigate a MaxConnsPerHost in the transport for this, possibly in a global
- // Client.
+ // Consider a MaxConnsPerHost in the transport for this, possibly in a global Client.
HttpClient: http.DefaultClient,
Url: url,
},
- requests: make(map[Request]webseed.Request, maxRequests),
+ activeRequests: make(map[Request]webseed.Request, maxRequests),
+ }
+ ws.requesterCond.L = t.cl.locker()
+ for range iter.N(maxRequests) {
+ go ws.requester()
}
for _, f := range t.callbacks().NewPeer {
f(&ws.peer)
import (
"fmt"
"strings"
+ "sync"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo"
)
type webseedPeer struct {
- client webseed.Client
- // TODO: Remove finished entries from this.
- requests map[Request]webseed.Request
- peer Peer
+ client webseed.Client
+ activeRequests map[Request]webseed.Request
+ requesterCond sync.Cond
+ peer Peer
}
var _ peerImpl = (*webseedPeer)(nil)
}
func (ws *webseedPeer) cancel(r Request) bool {
- ws.requests[r].Cancel()
+ active, ok := ws.activeRequests[r]
+ if !ok {
+ return false
+ }
+ active.Cancel()
return true
}
}
func (ws *webseedPeer) request(r Request) bool {
- webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
- ws.requests[r] = webseedRequest
- go ws.requestResultHandler(r, webseedRequest)
+ ws.requesterCond.Signal()
return true
}
+func (ws *webseedPeer) doRequest(r Request) {
+ webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+ ws.activeRequests[r] = webseedRequest
+ ws.requesterCond.L.Unlock()
+ ws.requestResultHandler(r, webseedRequest)
+ ws.requesterCond.L.Lock()
+ delete(ws.activeRequests, r)
+}
+
+func (ws *webseedPeer) requester() {
+ ws.requesterCond.L.Lock()
+ defer ws.requesterCond.L.Unlock()
+start:
+ for !ws.peer.closed.IsSet() {
+ for r := range ws.peer.requests {
+ if _, ok := ws.activeRequests[r]; ok {
+ continue
+ }
+ ws.doRequest(r)
+ goto start
+ }
+ ws.requesterCond.Wait()
+ }
+}
+
func (ws *webseedPeer) connectionFlags() string {
return "WS"
}
ws.peer.doRequestState()
}
-func (ws *webseedPeer) onClose() {}
+func (ws *webseedPeer) onClose() {
+ ws.requesterCond.Broadcast()
+}
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
result := <-webseedRequest.Result