"maps"
"os"
"runtime/pprof"
+ "slices"
"strings"
"sync"
"time"
return me.Value()
}
+type webseedRequestHeapElem struct {
+ webseedUniqueRequestKey
+ webseedRequestOrderValue
+ // Not sure this is even worth it now.
+ mightHavePartialFiles bool
+}
+
/*
- Go through all the requestable pieces in order of priority, availability, whether there are peer requests, partial, infohash.
- For each piece calculate files involved. Record each file not seen before and the piece index.
- Initiate missing requests that fit into the available limits.
*/
func (cl *Client) updateWebseedRequests() {
- aprioriMap := make(map[webseedUniqueRequestKey]aprioriMapValue)
+ existingRequests := maps.Collect(cl.iterCurrentWebseedRequestsFromClient())
+ panicif.False(maps.Equal(existingRequests, maps.Collect(cl.iterCurrentWebseedRequests())))
+
+ g.MakeMapIfNil(&cl.aprioriMap)
+ aprioriMap := cl.aprioriMap
+ clear(aprioriMap)
for uniqueKey, value := range cl.iterPossibleWebseedRequests() {
+ if g.MapContains(existingRequests, uniqueKey) {
+ continue
+ }
cur, ok := aprioriMap[uniqueKey]
if ok {
// Shared in the lookup above.
}
aprioriMap[uniqueKey] = value
}
- // This includes startRequest in the key. This means multiple webseed requests can exist in the
- // same webseed request slice.
- existingRequests := make(map[webseedUniqueRequestKey]webseedRequestOrderValue)
- existingRequestCount := 0
- // TODO: Maintain a cache of active requests in the Client, and use them to filter proposed
- // requests (same result but less allocations).
- for key, value := range cl.iterCurrentWebseedRequests() {
- existingRequests[key] = value
- existingRequestCount++
- }
- // Check "active" current webseed request cardinality matches expectation.
- panicif.NotEq(len(existingRequests), existingRequestCount)
- // We don't need the value but maybe cloning is just faster anyway?
- unusedExistingRequests := maps.Clone(existingRequests)
- type heapElem struct {
- webseedUniqueRequestKey
- webseedRequestOrderValue
- // Not sure this is even worth it now.
- mightHavePartialFiles bool
- }
+
// Build the request heap, merging existing requests if they match.
- heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests))
+
+ heapSlice := cl.heapSlice[:0]
+ requiredCap := len(aprioriMap) + len(existingRequests)
+ if cap(heapSlice) < requiredCap {
+ heapSlice = slices.Grow(heapSlice, requiredCap-cap(heapSlice))
+ }
+ defer func() {
+ // Will this let GC collect values? If not do we need to clear? :(
+ cl.heapSlice = heapSlice[:0]
+ }()
+
for key, value := range aprioriMap {
- if g.MapContains(existingRequests, key) {
- // Prefer the existing request always
- continue
- }
- heapSlice = append(heapSlice, heapElem{
+ // Should be filtered earlier.
+ panicif.True(g.MapContains(existingRequests, key))
+ heapSlice = append(heapSlice, webseedRequestHeapElem{
key,
webseedRequestOrderValue{
aprioriMapValue: value,
})
}
// Add remaining existing requests.
- for key, value := range unusedExistingRequests {
+ for key, value := range existingRequests {
// Don't reconsider existing requests that aren't wanted anymore.
if key.t.dataDownloadDisallowed.IsSet() {
continue
}
wr := value.existingWebseedRequest
- heapSlice = append(heapSlice, heapElem{
+ heapSlice = append(heapSlice, webseedRequestHeapElem{
key,
existingRequests[key],
key.t.filesInRequestRangeMightBePartial(wr.next, wr.end),
}
aprioriHeap := heap.InterfaceForSlice(
&heapSlice,
- func(l heapElem, r heapElem) bool {
+ func(l webseedRequestHeapElem, r webseedRequestHeapElem) bool {
// Not stable ordering but being sticky to existing webseeds should be enough.
ret := cmp.Or(
// Prefer highest priority
cl.scheduleImmediateWebseedRequestUpdate(reason)
}
+// This has awful naming, I'm not quite sure what to call this.
+func (cl *Client) yieldKeyAndValue(
+ yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool,
+ key webseedUniqueRequestKey,
+ ar *webseedRequest,
+) bool {
+ t := key.t
+ url := key.url
+ hostKey := t.webSeeds[url].hostKey
+ if ar.next >= ar.end {
+ // This request is done, so don't yield it.
+ return true
+ }
+ // Don't spawn requests before old requests are cancelled.
+ if false {
+ if ar.cancelled.Load() {
+ cl.slogger.Debug("iter current webseed requests: skipped cancelled webseed request")
+ // This should prevent overlapping webseed requests that are just filling
+ // slots waiting to cancel from conflicting.
+ return true
+ }
+ }
+ p := t.piece(t.pieceIndexOfRequestIndex(ar.next))
+ return yield(
+ webseedUniqueRequestKey{
+ t: t,
+ sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next),
+ url: url,
+ },
+ webseedRequestOrderValue{
+ aprioriMapValue{
+ priority: p.effectivePriority(),
+ costKey: hostKey,
+ startRequest: ar.next,
+ },
+ ar,
+ },
+ )
+}
+
+func (cl *Client) iterCurrentWebseedRequestsFromClient() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
+ return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+ for key, ar := range cl.activeWebseedRequests {
+ if !cl.yieldKeyAndValue(yield, key, ar) {
+ return
+ }
+ }
+ }
+}
+
+// This exists to compare old behaviour with Client active requests.
func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
- // TODO: This entire thing can be a single map on Client ("active webseed requests").
for t := range cl.torrents {
for url, ws := range t.webSeeds {
for ar := range ws.activeRequests {
- if ar.next >= ar.end {
- // This request is done, so don't yield it.
- continue
- }
- // Don't spawn requests before old requests are cancelled.
- if false {
- if ar.cancelled.Load() {
- cl.slogger.Debug("iter current webseed requests: skipped cancelled webseed request")
- // This should prevent overlapping webseed requests that are just filling
- // slots waiting to cancel from conflicting.
- continue
- }
+ key := webseedUniqueRequestKey{
+ t: t,
+ sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next),
+ url: url,
}
- p := t.piece(t.pieceIndexOfRequestIndex(ar.next))
- if !yield(
- webseedUniqueRequestKey{
- t: t,
- sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next),
- url: url,
- },
- webseedRequestOrderValue{
- aprioriMapValue{
- priority: p.effectivePriority(),
- costKey: ws.hostKey,
- startRequest: ar.next,
- },
- ar,
- },
- ) {
+ if !cl.yieldKeyAndValue(yield, key, ar) {
return
}
}