- Initiate missing requests that fit into the available limits.
*/
func (cl *Client) updateWebseedRequests() {
- type aprioriMapValue struct {
- startIndex RequestIndex
- webseedRequestOrderValue
- }
aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue)
for uniqueKey, value := range cl.iterPossibleWebseedRequests() {
cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey]
if ok {
// Shared in the lookup above.
t := uniqueKey.t
+ // TODO: Change to "slice has requests"
hasPeerConnRequest := func(reqIndex RequestIndex) bool {
return t.requestingPeer(reqIndex) != nil
}
// webseed requests in favour of peer conns unless there's nothing else to do.
if cmp.Or(
cmp.Compare(value.priority, cur.priority),
- compareBool(hasPeerConnRequest(cur.startIndex), hasPeerConnRequest(uniqueKey.startRequest)),
- cmp.Compare(cur.startIndex, uniqueKey.startRequest),
+ compareBool(hasPeerConnRequest(cur.startRequest), hasPeerConnRequest(value.startRequest)),
+ cmp.Compare(cur.startRequest, value.startRequest),
) <= 0 {
continue
}
}
- aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startRequest, value}
+ aprioriMap[uniqueKey.aprioriWebseedRequestKey] = value
+ }
+ // This includes startRequest in the key. This means multiple webseed requests can exist in the
+ // same webseed request slice.
+ existingRequests := make(map[webseedUniqueRequestKey]webseedRequestOrderValue)
+ existingRequestCount := 0
+ // TODO: Maintain a cache of active requests in the Client, and use them to filter proposed
+ // requests (same result but less allocations).
+ for key, value := range cl.iterCurrentWebseedRequests() {
+ existingRequests[key] = value
+ existingRequestCount++
}
- // TODO: This should not be keyed on startRequest but only on sliceIndex.
- existingRequests := maps.Collect(cl.iterCurrentWebseedRequests())
+ // Check "active" current webseed request cardinality matches expectation.
+ panicif.NotEq(len(existingRequests), existingRequestCount)
// We don't need the value but maybe cloning is just faster anyway?
unusedExistingRequests := maps.Clone(existingRequests)
type heapElem struct {
webseedUniqueRequestKey
webseedRequestOrderValue
+ // Not sure this is even worth it now.
mightHavePartialFiles bool
}
// Build the request heap, merging existing requests if they match.
heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests))
for key, value := range aprioriMap {
- fullKey := webseedUniqueRequestKey{key, value.startIndex}
- heapValue := value.webseedRequestOrderValue
- // If there's a matching existing request, make sure to include a reference to it in the
- // heap value and deduplicate it.
- existingValue, ok := existingRequests[fullKey]
- if ok {
- // Priorities should have been generated the same.
- panicif.NotEq(value.priority, existingValue.priority)
- // A-priori map should not have existing request associated with it. TODO: a-priori map
- // value shouldn't need some fields.
- panicif.NotZero(value.existingWebseedRequest)
- heapValue.existingWebseedRequest = existingValue.existingWebseedRequest
- // Now the values should match exactly.
- panicif.NotEq(heapValue, existingValue)
- g.MustDelete(unusedExistingRequests, fullKey)
+ fullKey := webseedUniqueRequestKey{key}
+ if g.MapContains(existingRequests, fullKey) {
+ // Prefer the existing request always
+ continue
}
heapSlice = append(heapSlice, heapElem{
fullKey,
- heapValue,
- fullKey.mightHavePartialFiles(),
+ webseedRequestOrderValue{
+ aprioriMapValue: value,
+ },
+ fullKey.t.filesInForWebseedRequestSliceMightBePartial(value.startRequest),
})
}
// Add remaining existing requests.
- for key := range unusedExistingRequests {
+ for key, value := range unusedExistingRequests {
// Don't reconsider existing requests that aren't wanted anymore.
if key.t.dataDownloadDisallowed.IsSet() {
continue
}
- heapSlice = append(heapSlice, heapElem{key, existingRequests[key], key.mightHavePartialFiles()})
+ heapSlice = append(heapSlice, heapElem{
+ key,
+ existingRequests[key],
+ key.t.filesInForWebseedRequestSliceMightBePartial(value.startRequest),
+ })
}
aprioriHeap := heap.InterfaceForSlice(
&heapSlice,
// Doing earlier chunks first means more compact files for partial file hashing.
cmp.Compare(l.sliceIndex, r.sliceIndex),
)
- if ret == 0 {
+ // Requests should be unique unless they're for different URLs.
+ if ret == 0 && l.url == r.url {
cfg := spew.NewDefaultConfig()
cfg.Dump(l)
cfg.Dump(r)
}
g.MakeMapIfNil(&plan.byCost)
requestKey := elem.webseedUniqueRequestKey
- plan.byCost[costKey] = append(plan.byCost[costKey], requestKey)
+ plan.byCost[costKey] = append(plan.byCost[costKey], plannedWebseedRequest{
+ url: elem.url,
+ t: elem.t,
+ startIndex: elem.startRequest,
+ })
delete(unwantedExistingRequests, requestKey)
}
// TODO: Do we deduplicate requests across different webseeds?
- for costKey, requestKeys := range plan.byCost {
- for _, requestKey := range requestKeys {
+ for costKey, plannedRequests := range plan.byCost {
+ for _, request := range plannedRequests {
// This could happen if a request is cancelled but hasn't removed itself from the active
// list yet. This helps with backpressure as the requests can sleep to rate limit.
if !cl.underWebSeedHttpRequestLimit(costKey) {
break
}
- if g.MapContains(existingRequests, requestKey) {
+ existingRequestKey := request.toChunkedWebseedRequestKey()
+ if g.MapContains(existingRequests, existingRequestKey) {
+ // A request exists to the webseed slice already. This doesn't check the request
+ // indexes match.
+
+ // Check we didn't just cancel the same request.
+ panicif.True(g.MapContains(unwantedExistingRequests, existingRequestKey))
continue
}
- t := requestKey.t
- peer := t.webSeeds[requestKey.url]
+ t := request.t
+ peer := t.webSeeds[request.url]
panicif.NotEq(peer.hostKey, costKey)
printPlan()
Level: slog.LevelDebug,
AddSource: true,
})).With(
- "webseedUrl", requestKey.url,
- "webseedChunkIndex", requestKey.sliceIndex)
+ "webseedUrl", request.url,
+ "webseedChunkIndex", request.sliceIndex)
- begin := requestKey.startRequest
+ begin := request.startIndex
end := t.getWebseedRequestEnd(begin, debugLogger)
panicif.LessThanOrEqual(end, begin)
}
type webseedRequestPlan struct {
- byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey
+ byCost map[webseedHostKeyHandle][]plannedWebseedRequest
+}
+
+// Needed components to generate a webseed request.
+type plannedWebseedRequest struct {
+ url webseedUrlKey
+ t *Torrent
+ startIndex RequestIndex
+}
+
+func (me *plannedWebseedRequest) sliceIndex() RequestIndex {
+ return me.startIndex / me.t.chunksPerAlignedWebseedResponse()
+}
+
+func (me *plannedWebseedRequest) toChunkedWebseedRequestKey() webseedUniqueRequestKey {
+ return webseedUniqueRequestKey{
+ aprioriWebseedRequestKey{
+ url: me.url,
+ t: me.t,
+ sliceIndex: me.sliceIndex(),
+ },
+ }
}
func (me webseedRequestPlan) String() string {
sliceIndex RequestIndex
}
+type aprioriMapValue struct {
+ costKey webseedHostKeyHandle
+ priority PiecePriority
+ startRequest RequestIndex
+}
+
func (me *aprioriWebseedRequestKey) String() string {
return fmt.Sprintf("slice %v from %v", me.sliceIndex, me.url)
}
// Distinct webseed request when different offsets to the same object are allowed.
type webseedUniqueRequestKey struct {
aprioriWebseedRequestKey
- startRequest RequestIndex
-}
-
-func (me webseedUniqueRequestKey) endPieceIndex() pieceIndex {
- return pieceIndex(intCeilDiv(
- me.t.endRequestForAlignedWebseedResponse(me.startRequest),
- me.t.chunksPerRegularPiece()))
-}
-
-func (me webseedUniqueRequestKey) mightHavePartialFiles() bool {
- return me.t.filesInPieceRangeMightBePartial(
- me.t.pieceIndexOfRequestIndex(me.startRequest),
- me.endPieceIndex())
-}
-
-func (me webseedUniqueRequestKey) longestFile() (ret g.Option[int64]) {
- t := me.t
- firstPiece := t.pieceIndexOfRequestIndex(me.startRequest)
- firstFileIndex := t.piece(firstPiece).beginFile
- endFileIndex := t.piece(me.endPieceIndex() - 1).endFile
- for fileIndex := firstFileIndex; fileIndex < endFileIndex; fileIndex++ {
- fileLength := t.getFile(fileIndex).length
- if ret.Ok {
- ret.Value = max(ret.Value, fileLength)
- } else {
- ret.Set(fileLength)
- }
- }
- return
-}
-
-func (me webseedUniqueRequestKey) String() string {
- return fmt.Sprintf(
- "%v at %v:%v",
- me.aprioriWebseedRequestKey,
- me.sliceIndex,
- me.startRequest%me.t.chunksPerAlignedWebseedResponse(),
- )
}
// Non-distinct proposed webseed request data.
type webseedRequestOrderValue struct {
- // The associated webseed request per host limit.
- costKey webseedHostKeyHandle
+ aprioriMapValue
// Used for cancellation if this is deprioritized. Also, a faster way to sort for existing
// requests.
existingWebseedRequest *webseedRequest
- priority PiecePriority
}
func (me webseedRequestOrderValue) String() string {
}
// Yields possible webseed requests by piece. Caller should filter and prioritize these.
-func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
- return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, aprioriMapValue] {
+ return func(yield func(webseedUniqueRequestKey, aprioriMapValue) bool) {
for key, value := range cl.pieceRequestOrder {
input := key.getRequestStrategyInput(cl)
requestStrategy.GetRequestablePieces(
sliceIndex: webseedSliceIndex,
url: url,
},
- firstRequest,
},
- webseedRequestOrderValue{
- priority: priority,
- costKey: ws.hostKey,
+ aprioriMapValue{
+ priority: priority,
+ costKey: ws.hostKey,
+ startRequest: firstRequest,
},
) {
return false
func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+ // TODO: This entire thing can be a single map on Client ("active webseed requests").
for t := range cl.torrents {
for url, ws := range t.webSeeds {
for ar := range ws.activeRequests {
sliceIndex: ar.next / t.chunksPerAlignedWebseedResponse(),
url: url,
},
- ar.next,
},
webseedRequestOrderValue{
- priority: p.effectivePriority(),
- existingWebseedRequest: ar,
- costKey: ws.hostKey,
+ aprioriMapValue{
+ priority: p.effectivePriority(),
+ costKey: ws.hostKey,
+ startRequest: ar.next,
+ },
+ ar,
},
) {
return