From 02819af1085c0cf2bffd97826c22cbb5791fb4f6 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 13 Aug 2025 11:42:12 +1000 Subject: [PATCH] Rework existing webseed requests for unique slice indexes --- torrent.go | 8 ++ webseed-requesting.go | 184 +++++++++++++++++++++--------------------- 2 files changed, 102 insertions(+), 90 deletions(-) diff --git a/torrent.go b/torrent.go index 9c8d534f..b8a6c78c 100644 --- a/torrent.go +++ b/torrent.go @@ -3619,6 +3619,14 @@ func (t *Torrent) filesInPieceRangeMightBePartial(begin, end pieceIndex) bool { return t.piecesMightBePartial(begin, end) } +// Pieces in the range [begin, end) may have partially complete files. Note we only check for dirty chunks and either all or no pieces being complete. +func (t *Torrent) filesInForWebseedRequestSliceMightBePartial(firstRequest RequestIndex) bool { + beginPiece := t.pieceIndexOfRequestIndex(firstRequest) + endRequest := t.endRequestForAlignedWebseedResponse(firstRequest) + endPiece := pieceIndex(intCeilDiv(endRequest, t.chunksPerRegularPiece())) + return t.filesInPieceRangeMightBePartial(beginPiece, endPiece) +} + // Pieces in the range [begin, end) are dirty, or in a mixed completion state. func (t *Torrent) piecesMightBePartial(beginPieceIndex, endPieceIndex int) bool { // Check for dirtied chunks. diff --git a/webseed-requesting.go b/webseed-requesting.go index 18a6089b..4e5e527b 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -48,16 +48,13 @@ func (me webseedUrlKey) String() string { - Initiate missing requests that fit into the available limits. */ func (cl *Client) updateWebseedRequests() { - type aprioriMapValue struct { - startIndex RequestIndex - webseedRequestOrderValue - } aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue) for uniqueKey, value := range cl.iterPossibleWebseedRequests() { cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey] if ok { // Shared in the lookup above. t := uniqueKey.t + // TODO: Change to "slice has requests" hasPeerConnRequest := func(reqIndex RequestIndex) bool { return t.requestingPeer(reqIndex) != nil } @@ -66,55 +63,61 @@ func (cl *Client) updateWebseedRequests() { // webseed requests in favour of peer conns unless there's nothing else to do. if cmp.Or( cmp.Compare(value.priority, cur.priority), - compareBool(hasPeerConnRequest(cur.startIndex), hasPeerConnRequest(uniqueKey.startRequest)), - cmp.Compare(cur.startIndex, uniqueKey.startRequest), + compareBool(hasPeerConnRequest(cur.startRequest), hasPeerConnRequest(value.startRequest)), + cmp.Compare(cur.startRequest, value.startRequest), ) <= 0 { continue } } - aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startRequest, value} + aprioriMap[uniqueKey.aprioriWebseedRequestKey] = value + } + // This includes startRequest in the key. This means multiple webseed requests can exist in the + // same webseed request slice. + existingRequests := make(map[webseedUniqueRequestKey]webseedRequestOrderValue) + existingRequestCount := 0 + // TODO: Maintain a cache of active requests in the Client, and use them to filter proposed + // requests (same result but less allocations). + for key, value := range cl.iterCurrentWebseedRequests() { + existingRequests[key] = value + existingRequestCount++ } - // TODO: This should not be keyed on startRequest but only on sliceIndex. - existingRequests := maps.Collect(cl.iterCurrentWebseedRequests()) + // Check "active" current webseed request cardinality matches expectation. + panicif.NotEq(len(existingRequests), existingRequestCount) // We don't need the value but maybe cloning is just faster anyway? unusedExistingRequests := maps.Clone(existingRequests) type heapElem struct { webseedUniqueRequestKey webseedRequestOrderValue + // Not sure this is even worth it now. mightHavePartialFiles bool } // Build the request heap, merging existing requests if they match. heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests)) for key, value := range aprioriMap { - fullKey := webseedUniqueRequestKey{key, value.startIndex} - heapValue := value.webseedRequestOrderValue - // If there's a matching existing request, make sure to include a reference to it in the - // heap value and deduplicate it. - existingValue, ok := existingRequests[fullKey] - if ok { - // Priorities should have been generated the same. - panicif.NotEq(value.priority, existingValue.priority) - // A-priori map should not have existing request associated with it. TODO: a-priori map - // value shouldn't need some fields. - panicif.NotZero(value.existingWebseedRequest) - heapValue.existingWebseedRequest = existingValue.existingWebseedRequest - // Now the values should match exactly. - panicif.NotEq(heapValue, existingValue) - g.MustDelete(unusedExistingRequests, fullKey) + fullKey := webseedUniqueRequestKey{key} + if g.MapContains(existingRequests, fullKey) { + // Prefer the existing request always + continue } heapSlice = append(heapSlice, heapElem{ fullKey, - heapValue, - fullKey.mightHavePartialFiles(), + webseedRequestOrderValue{ + aprioriMapValue: value, + }, + fullKey.t.filesInForWebseedRequestSliceMightBePartial(value.startRequest), }) } // Add remaining existing requests. - for key := range unusedExistingRequests { + for key, value := range unusedExistingRequests { // Don't reconsider existing requests that aren't wanted anymore. if key.t.dataDownloadDisallowed.IsSet() { continue } - heapSlice = append(heapSlice, heapElem{key, existingRequests[key], key.mightHavePartialFiles()}) + heapSlice = append(heapSlice, heapElem{ + key, + existingRequests[key], + key.t.filesInForWebseedRequestSliceMightBePartial(value.startRequest), + }) } aprioriHeap := heap.InterfaceForSlice( &heapSlice, @@ -140,7 +143,8 @@ func (cl *Client) updateWebseedRequests() { // Doing earlier chunks first means more compact files for partial file hashing. cmp.Compare(l.sliceIndex, r.sliceIndex), ) - if ret == 0 { + // Requests should be unique unless they're for different URLs. + if ret == 0 && l.url == r.url { cfg := spew.NewDefaultConfig() cfg.Dump(l) cfg.Dump(r) @@ -172,7 +176,11 @@ func (cl *Client) updateWebseedRequests() { } g.MakeMapIfNil(&plan.byCost) requestKey := elem.webseedUniqueRequestKey - plan.byCost[costKey] = append(plan.byCost[costKey], requestKey) + plan.byCost[costKey] = append(plan.byCost[costKey], plannedWebseedRequest{ + url: elem.url, + t: elem.t, + startIndex: elem.startRequest, + }) delete(unwantedExistingRequests, requestKey) } @@ -190,18 +198,24 @@ func (cl *Client) updateWebseedRequests() { // TODO: Do we deduplicate requests across different webseeds? - for costKey, requestKeys := range plan.byCost { - for _, requestKey := range requestKeys { + for costKey, plannedRequests := range plan.byCost { + for _, request := range plannedRequests { // This could happen if a request is cancelled but hasn't removed itself from the active // list yet. This helps with backpressure as the requests can sleep to rate limit. if !cl.underWebSeedHttpRequestLimit(costKey) { break } - if g.MapContains(existingRequests, requestKey) { + existingRequestKey := request.toChunkedWebseedRequestKey() + if g.MapContains(existingRequests, existingRequestKey) { + // A request exists to the webseed slice already. This doesn't check the request + // indexes match. + + // Check we didn't just cancel the same request. + panicif.True(g.MapContains(unwantedExistingRequests, existingRequestKey)) continue } - t := requestKey.t - peer := t.webSeeds[requestKey.url] + t := request.t + peer := t.webSeeds[request.url] panicif.NotEq(peer.hostKey, costKey) printPlan() @@ -209,10 +223,10 @@ func (cl *Client) updateWebseedRequests() { Level: slog.LevelDebug, AddSource: true, })).With( - "webseedUrl", requestKey.url, - "webseedChunkIndex", requestKey.sliceIndex) + "webseedUrl", request.url, + "webseedChunkIndex", request.sliceIndex) - begin := requestKey.startRequest + begin := request.startIndex end := t.getWebseedRequestEnd(begin, debugLogger) panicif.LessThanOrEqual(end, begin) @@ -275,7 +289,28 @@ func (cl *Client) dumpCurrentWebseedRequests() { } type webseedRequestPlan struct { - byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey + byCost map[webseedHostKeyHandle][]plannedWebseedRequest +} + +// Needed components to generate a webseed request. +type plannedWebseedRequest struct { + url webseedUrlKey + t *Torrent + startIndex RequestIndex +} + +func (me *plannedWebseedRequest) sliceIndex() RequestIndex { + return me.startIndex / me.t.chunksPerAlignedWebseedResponse() +} + +func (me *plannedWebseedRequest) toChunkedWebseedRequestKey() webseedUniqueRequestKey { + return webseedUniqueRequestKey{ + aprioriWebseedRequestKey{ + url: me.url, + t: me.t, + sliceIndex: me.sliceIndex(), + }, + } } func (me webseedRequestPlan) String() string { @@ -296,6 +331,12 @@ type aprioriWebseedRequestKey struct { sliceIndex RequestIndex } +type aprioriMapValue struct { + costKey webseedHostKeyHandle + priority PiecePriority + startRequest RequestIndex +} + func (me *aprioriWebseedRequestKey) String() string { return fmt.Sprintf("slice %v from %v", me.sliceIndex, me.url) } @@ -303,54 +344,14 @@ func (me *aprioriWebseedRequestKey) String() string { // Distinct webseed request when different offsets to the same object are allowed. type webseedUniqueRequestKey struct { aprioriWebseedRequestKey - startRequest RequestIndex -} - -func (me webseedUniqueRequestKey) endPieceIndex() pieceIndex { - return pieceIndex(intCeilDiv( - me.t.endRequestForAlignedWebseedResponse(me.startRequest), - me.t.chunksPerRegularPiece())) -} - -func (me webseedUniqueRequestKey) mightHavePartialFiles() bool { - return me.t.filesInPieceRangeMightBePartial( - me.t.pieceIndexOfRequestIndex(me.startRequest), - me.endPieceIndex()) -} - -func (me webseedUniqueRequestKey) longestFile() (ret g.Option[int64]) { - t := me.t - firstPiece := t.pieceIndexOfRequestIndex(me.startRequest) - firstFileIndex := t.piece(firstPiece).beginFile - endFileIndex := t.piece(me.endPieceIndex() - 1).endFile - for fileIndex := firstFileIndex; fileIndex < endFileIndex; fileIndex++ { - fileLength := t.getFile(fileIndex).length - if ret.Ok { - ret.Value = max(ret.Value, fileLength) - } else { - ret.Set(fileLength) - } - } - return -} - -func (me webseedUniqueRequestKey) String() string { - return fmt.Sprintf( - "%v at %v:%v", - me.aprioriWebseedRequestKey, - me.sliceIndex, - me.startRequest%me.t.chunksPerAlignedWebseedResponse(), - ) } // Non-distinct proposed webseed request data. type webseedRequestOrderValue struct { - // The associated webseed request per host limit. - costKey webseedHostKeyHandle + aprioriMapValue // Used for cancellation if this is deprioritized. Also, a faster way to sort for existing // requests. existingWebseedRequest *webseedRequest - priority PiecePriority } func (me webseedRequestOrderValue) String() string { @@ -358,8 +359,8 @@ func (me webseedRequestOrderValue) String() string { } // Yields possible webseed requests by piece. Caller should filter and prioritize these. -func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { - return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { +func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, aprioriMapValue] { + return func(yield func(webseedUniqueRequestKey, aprioriMapValue) bool) { for key, value := range cl.pieceRequestOrder { input := key.getRequestStrategyInput(cl) requestStrategy.GetRequestablePieces( @@ -391,11 +392,11 @@ func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKe sliceIndex: webseedSliceIndex, url: url, }, - firstRequest, }, - webseedRequestOrderValue{ - priority: priority, - costKey: ws.hostKey, + aprioriMapValue{ + priority: priority, + costKey: ws.hostKey, + startRequest: firstRequest, }, ) { return false @@ -416,6 +417,7 @@ func (cl *Client) updateWebseedRequestsWithReason(reason updateRequestReason) { func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { + // TODO: This entire thing can be a single map on Client ("active webseed requests"). for t := range cl.torrents { for url, ws := range t.webSeeds { for ar := range ws.activeRequests { @@ -437,12 +439,14 @@ func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey sliceIndex: ar.next / t.chunksPerAlignedWebseedResponse(), url: url, }, - ar.next, }, webseedRequestOrderValue{ - priority: p.effectivePriority(), - existingWebseedRequest: ar, - costKey: ws.hostKey, + aprioriMapValue{ + priority: p.effectivePriority(), + costKey: ws.hostKey, + startRequest: ar.next, + }, + ar, }, ) { return -- 2.51.0