metadataChanged: sync.Cond{
L: cl.locker(),
},
- webSeeds: make(map[string]*Peer),
gotMetainfoC: make(chan struct{}),
}
+ g.MakeMap(&t.webSeeds)
t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background())
var salt [8]byte
rand.Read(salt[:])
defer cl.rUnlock()
return cl.statsLocked()
}
+
+func (cl *Client) underWebSeedHttpRequestLimit() bool {
+ num := 0
+ for t := range cl.torrents {
+ for _, p := range t.webSeeds {
+ num += p.numRequests()
+ }
+ }
+ return num < 10
+}
}
// Transmit/action the request state to the peer. This includes work-stealing from other peers and
-// some piece order randomization within the preferred state calculated earlier in next.
+// some piece order randomization within the preferred state calculated earlier in next. Cancels are
+// not done here, those are handled synchronously. We only track pending cancel acknowledgements.
func (p *Peer) applyRequestState(next desiredRequestState) {
current := &p.requestState
// Make interest sticky
defer t.cl.rUnlock()
ret := make([]*Peer, 0, len(t.conns))
for _, c := range t.webSeeds {
- ret = append(ret, c)
+ ret = append(ret, &c.peer)
}
return ret
}
_chunksPerRegularPiece chunkIndexType
- webSeeds map[string]*Peer
+ webSeeds map[string]*webseedPeer
// Active peer connections, running message stream loops. TODO: Make this
// open (not-closed) connections only.
conns map[*PeerConn]struct{}
dumpStats(w, t.statsLocked())
fmt.Fprintf(w, "webseeds:\n")
- t.writePeerStatuses(w, maps.Values(t.webSeeds))
+
+ t.writePeerStatuses(w, func(yield func(*Peer) bool) {
+ for _, ws := range t.webSeeds {
+ if !yield(&ws.peer) {
+ return
+ }
+ }
+ })
// Peers without priorities first, then those with. I'm undecided about how to order peers
// without priorities.
f(&pc.Peer)
}
for _, ws := range t.webSeeds {
- f(ws)
+ f(&ws.peer)
}
}
return false
}
// I don't think Go http supports pipelining requests. However, we can have more ready to go
- // right away. This value should be some multiple of the number of connections to a host. I
- // would expect that double maxRequests plus a bit would be appropriate. This value is based on
- // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
- // "https://webtorrent.io/torrents/".
- const defaultMaxRequests = 16
+ // right away. This value should be some multiple of the number of connections to a host. This
+ // number is based on keeping at least one connection actively downloading while another request
+ // is fired off, and ensuring race detection works. Downloading Sintel
+ // (08ada5a7a6183aae1e09d831df6748d566095a10) from "https://webtorrent.io/torrents/" is a good
+ // test.
+ const defaultMaxRequests = 2
ws := webseedPeer{
peer: Peer{
t: t,
opt(&ws.client)
}
g.MakeMapWithCap(&ws.activeRequests, ws.client.MaxRequests)
- // This should affect how often we have to recompute requests for this peer. Note that
- // because we can request more than 1 thing at a time over HTTP, we will hit the low
- // requests mark more often, so recomputation is probably sooner than with regular peer
- // conns. ~4x maxRequests would be about right.
- ws.peer.PeerMaxRequests = 4 * ws.client.MaxRequests
+ // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
+ // we just allow 2 MiB worth of requests. See newHotPeerImpl.nominalMaxRequests.
+ ws.peer.PeerMaxRequests = intCeilDiv(2<<20, ws.peer.t.chunkSize.Int())
ws.peer.initUpdateRequestsTimer()
ws.locker = t.cl.locker()
for _, f := range t.callbacks().NewPeer {
if t.haveInfo() {
ws.onGotInfo(t.info)
}
- t.webSeeds[url] = &ws.peer
+ t.webSeeds[url] = &ws
ws.peer.onNeedUpdateRequests("Torrent.addWebSeed")
return true
}
"time"
"github.com/RoaringBitmap/roaring"
- g "github.com/anacrolix/generics"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
func (me *webseedPeer) nominalMaxRequests() maxRequests {
// TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
// we just allow 2 MiB worth of requests.
- return intCeilDiv(2<<20, me.peer.t.chunkSize.Int())
+ return me.peer.PeerMaxRequests
}
func (me *webseedPeer) acksCancels() bool {
return false
}
+func (me *webseedPeer) numRequests() int {
+ // What about unassigned requests? TODO: Don't allow those.
+ return len(me.activeRequests)
+}
+
+func (me *webseedPeer) shouldUpdateRequests() bool {
+ return me.peer.t.cl.underWebSeedHttpRequestLimit()
+}
+
func (me *webseedPeer) updateRequests() {
+ if !me.shouldUpdateRequests() {
+ return
+ }
p := &me.peer
next := p.getDesiredRequestState()
p.applyRequestState(next)
if err != nil {
ws.peer.onNeedUpdateRequests("webseedPeer request errored")
}
- ws.spawnRequests()
+ ws.peer.t.cl.updateWebSeedRequests()
locker.Unlock()
}
func (ws *webseedPeer) spawnRequests() {
next, stop := iter.Pull(ws.inactiveRequests())
defer stop()
- for len(ws.activeRequests) <= ws.client.MaxRequests {
+ for len(ws.activeRequests) < ws.client.MaxRequests && ws.peer.t.cl.underWebSeedHttpRequestLimit() {
req, ok := next()
if !ok {
break
}
func (ws *webseedPeer) handleOnNeedUpdateRequests() {
- // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
- // pieces.
- go func() {
- ws.peer.t.cl.lock()
- defer ws.peer.t.cl.unlock()
- ws.peer.maybeUpdateActualRequestState()
- }()
+ ws.peer.maybeUpdateActualRequestState()
}
func (ws *webseedPeer) onClose() {
- For each piece calculate files involved. Record each file not seen before and the piece index.
- Cancel any outstanding requests that don't match a final file/piece-index pair.
- Initiate missing requests that fit into the available limits.
+
+This was a globally aware webseed requestor algorithm that is probably going to be abandoned.
*/
-func (cl *Client) updateWebSeedRequests() {
+func (cl *Client) abandonedUpdateWebSeedRequests() {
for key, value := range cl.pieceRequestOrder {
input := key.getRequestStrategyInput(cl)
requestStrategy.GetRequestablePieces(
)
}
}
+
+func (cl *Client) updateWebSeedRequests() {
+ for t := range cl.torrents {
+ for _, p := range t.webSeeds {
+ p.updateRequests()
+ }
+ }
+}