]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework existing webseed requests for unique slice indexes
authorMatt Joiner <anacrolix@gmail.com>
Wed, 13 Aug 2025 01:42:12 +0000 (11:42 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 13 Aug 2025 01:42:12 +0000 (11:42 +1000)
torrent.go
webseed-requesting.go

index 9c8d534f64961994bc35f21b415dafec92bab76f..b8a6c78c6fd91271c6874a69e3499f0a2e9becf2 100644 (file)
@@ -3619,6 +3619,14 @@ func (t *Torrent) filesInPieceRangeMightBePartial(begin, end pieceIndex) bool {
        return t.piecesMightBePartial(begin, end)
 }
 
+// Pieces in the range [begin, end) may have partially complete files. Note we only check for dirty chunks and either all or no pieces being complete.
+func (t *Torrent) filesInForWebseedRequestSliceMightBePartial(firstRequest RequestIndex) bool {
+       beginPiece := t.pieceIndexOfRequestIndex(firstRequest)
+       endRequest := t.endRequestForAlignedWebseedResponse(firstRequest)
+       endPiece := pieceIndex(intCeilDiv(endRequest, t.chunksPerRegularPiece()))
+       return t.filesInPieceRangeMightBePartial(beginPiece, endPiece)
+}
+
 // Pieces in the range [begin, end) are dirty, or in a mixed completion state.
 func (t *Torrent) piecesMightBePartial(beginPieceIndex, endPieceIndex int) bool {
        // Check for dirtied chunks.
index 18a6089b573dcdc487a2d7372e703ac624e8a175..4e5e527b3852de3333d50b805fc2f21ff109d7cb 100644 (file)
@@ -48,16 +48,13 @@ func (me webseedUrlKey) String() string {
 - Initiate missing requests that fit into the available limits.
 */
 func (cl *Client) updateWebseedRequests() {
-       type aprioriMapValue struct {
-               startIndex RequestIndex
-               webseedRequestOrderValue
-       }
        aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue)
        for uniqueKey, value := range cl.iterPossibleWebseedRequests() {
                cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey]
                if ok {
                        // Shared in the lookup above.
                        t := uniqueKey.t
+                       // TODO: Change to "slice has requests"
                        hasPeerConnRequest := func(reqIndex RequestIndex) bool {
                                return t.requestingPeer(reqIndex) != nil
                        }
@@ -66,55 +63,61 @@ func (cl *Client) updateWebseedRequests() {
                        // webseed requests in favour of peer conns unless there's nothing else to do.
                        if cmp.Or(
                                cmp.Compare(value.priority, cur.priority),
-                               compareBool(hasPeerConnRequest(cur.startIndex), hasPeerConnRequest(uniqueKey.startRequest)),
-                               cmp.Compare(cur.startIndex, uniqueKey.startRequest),
+                               compareBool(hasPeerConnRequest(cur.startRequest), hasPeerConnRequest(value.startRequest)),
+                               cmp.Compare(cur.startRequest, value.startRequest),
                        ) <= 0 {
                                continue
                        }
                }
-               aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startRequest, value}
+               aprioriMap[uniqueKey.aprioriWebseedRequestKey] = value
+       }
+       // This includes startRequest in the key. This means multiple webseed requests can exist in the
+       // same webseed request slice.
+       existingRequests := make(map[webseedUniqueRequestKey]webseedRequestOrderValue)
+       existingRequestCount := 0
+       // TODO: Maintain a cache of active requests in the Client, and use them to filter proposed
+       // requests (same result but less allocations).
+       for key, value := range cl.iterCurrentWebseedRequests() {
+               existingRequests[key] = value
+               existingRequestCount++
        }
-       // TODO: This should not be keyed on startRequest but only on sliceIndex.
-       existingRequests := maps.Collect(cl.iterCurrentWebseedRequests())
+       // Check "active" current webseed request cardinality matches expectation.
+       panicif.NotEq(len(existingRequests), existingRequestCount)
        // We don't need the value but maybe cloning is just faster anyway?
        unusedExistingRequests := maps.Clone(existingRequests)
        type heapElem struct {
                webseedUniqueRequestKey
                webseedRequestOrderValue
+               // Not sure this is even worth it now.
                mightHavePartialFiles bool
        }
        // Build the request heap, merging existing requests if they match.
        heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests))
        for key, value := range aprioriMap {
-               fullKey := webseedUniqueRequestKey{key, value.startIndex}
-               heapValue := value.webseedRequestOrderValue
-               // If there's a matching existing request, make sure to include a reference to it in the
-               // heap value and deduplicate it.
-               existingValue, ok := existingRequests[fullKey]
-               if ok {
-                       // Priorities should have been generated the same.
-                       panicif.NotEq(value.priority, existingValue.priority)
-                       // A-priori map should not have existing request associated with it. TODO: a-priori map
-                       // value shouldn't need some fields.
-                       panicif.NotZero(value.existingWebseedRequest)
-                       heapValue.existingWebseedRequest = existingValue.existingWebseedRequest
-                       // Now the values should match exactly.
-                       panicif.NotEq(heapValue, existingValue)
-                       g.MustDelete(unusedExistingRequests, fullKey)
+               fullKey := webseedUniqueRequestKey{key}
+               if g.MapContains(existingRequests, fullKey) {
+                       // Prefer the existing request always
+                       continue
                }
                heapSlice = append(heapSlice, heapElem{
                        fullKey,
-                       heapValue,
-                       fullKey.mightHavePartialFiles(),
+                       webseedRequestOrderValue{
+                               aprioriMapValue: value,
+                       },
+                       fullKey.t.filesInForWebseedRequestSliceMightBePartial(value.startRequest),
                })
        }
        // Add remaining existing requests.
-       for key := range unusedExistingRequests {
+       for key, value := range unusedExistingRequests {
                // Don't reconsider existing requests that aren't wanted anymore.
                if key.t.dataDownloadDisallowed.IsSet() {
                        continue
                }
-               heapSlice = append(heapSlice, heapElem{key, existingRequests[key], key.mightHavePartialFiles()})
+               heapSlice = append(heapSlice, heapElem{
+                       key,
+                       existingRequests[key],
+                       key.t.filesInForWebseedRequestSliceMightBePartial(value.startRequest),
+               })
        }
        aprioriHeap := heap.InterfaceForSlice(
                &heapSlice,
@@ -140,7 +143,8 @@ func (cl *Client) updateWebseedRequests() {
                                // Doing earlier chunks first means more compact files for partial file hashing.
                                cmp.Compare(l.sliceIndex, r.sliceIndex),
                        )
-                       if ret == 0 {
+                       // Requests should be unique unless they're for different URLs.
+                       if ret == 0 && l.url == r.url {
                                cfg := spew.NewDefaultConfig()
                                cfg.Dump(l)
                                cfg.Dump(r)
@@ -172,7 +176,11 @@ func (cl *Client) updateWebseedRequests() {
                }
                g.MakeMapIfNil(&plan.byCost)
                requestKey := elem.webseedUniqueRequestKey
-               plan.byCost[costKey] = append(plan.byCost[costKey], requestKey)
+               plan.byCost[costKey] = append(plan.byCost[costKey], plannedWebseedRequest{
+                       url:        elem.url,
+                       t:          elem.t,
+                       startIndex: elem.startRequest,
+               })
                delete(unwantedExistingRequests, requestKey)
        }
 
@@ -190,18 +198,24 @@ func (cl *Client) updateWebseedRequests() {
 
        // TODO: Do we deduplicate requests across different webseeds?
 
-       for costKey, requestKeys := range plan.byCost {
-               for _, requestKey := range requestKeys {
+       for costKey, plannedRequests := range plan.byCost {
+               for _, request := range plannedRequests {
                        // This could happen if a request is cancelled but hasn't removed itself from the active
                        // list yet. This helps with backpressure as the requests can sleep to rate limit.
                        if !cl.underWebSeedHttpRequestLimit(costKey) {
                                break
                        }
-                       if g.MapContains(existingRequests, requestKey) {
+                       existingRequestKey := request.toChunkedWebseedRequestKey()
+                       if g.MapContains(existingRequests, existingRequestKey) {
+                               // A request exists to the webseed slice already. This doesn't check the request
+                               // indexes match.
+
+                               // Check we didn't just cancel the same request.
+                               panicif.True(g.MapContains(unwantedExistingRequests, existingRequestKey))
                                continue
                        }
-                       t := requestKey.t
-                       peer := t.webSeeds[requestKey.url]
+                       t := request.t
+                       peer := t.webSeeds[request.url]
                        panicif.NotEq(peer.hostKey, costKey)
                        printPlan()
 
@@ -209,10 +223,10 @@ func (cl *Client) updateWebseedRequests() {
                                Level:     slog.LevelDebug,
                                AddSource: true,
                        })).With(
-                               "webseedUrl", requestKey.url,
-                               "webseedChunkIndex", requestKey.sliceIndex)
+                               "webseedUrl", request.url,
+                               "webseedChunkIndex", request.sliceIndex)
 
-                       begin := requestKey.startRequest
+                       begin := request.startIndex
                        end := t.getWebseedRequestEnd(begin, debugLogger)
                        panicif.LessThanOrEqual(end, begin)
 
@@ -275,7 +289,28 @@ func (cl *Client) dumpCurrentWebseedRequests() {
 }
 
 type webseedRequestPlan struct {
-       byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey
+       byCost map[webseedHostKeyHandle][]plannedWebseedRequest
+}
+
+// Needed components to generate a webseed request.
+type plannedWebseedRequest struct {
+       url        webseedUrlKey
+       t          *Torrent
+       startIndex RequestIndex
+}
+
+func (me *plannedWebseedRequest) sliceIndex() RequestIndex {
+       return me.startIndex / me.t.chunksPerAlignedWebseedResponse()
+}
+
+func (me *plannedWebseedRequest) toChunkedWebseedRequestKey() webseedUniqueRequestKey {
+       return webseedUniqueRequestKey{
+               aprioriWebseedRequestKey{
+                       url:        me.url,
+                       t:          me.t,
+                       sliceIndex: me.sliceIndex(),
+               },
+       }
 }
 
 func (me webseedRequestPlan) String() string {
@@ -296,6 +331,12 @@ type aprioriWebseedRequestKey struct {
        sliceIndex RequestIndex
 }
 
+type aprioriMapValue struct {
+       costKey      webseedHostKeyHandle
+       priority     PiecePriority
+       startRequest RequestIndex
+}
+
 func (me *aprioriWebseedRequestKey) String() string {
        return fmt.Sprintf("slice %v from %v", me.sliceIndex, me.url)
 }
@@ -303,54 +344,14 @@ func (me *aprioriWebseedRequestKey) String() string {
 // Distinct webseed request when different offsets to the same object are allowed.
 type webseedUniqueRequestKey struct {
        aprioriWebseedRequestKey
-       startRequest RequestIndex
-}
-
-func (me webseedUniqueRequestKey) endPieceIndex() pieceIndex {
-       return pieceIndex(intCeilDiv(
-               me.t.endRequestForAlignedWebseedResponse(me.startRequest),
-               me.t.chunksPerRegularPiece()))
-}
-
-func (me webseedUniqueRequestKey) mightHavePartialFiles() bool {
-       return me.t.filesInPieceRangeMightBePartial(
-               me.t.pieceIndexOfRequestIndex(me.startRequest),
-               me.endPieceIndex())
-}
-
-func (me webseedUniqueRequestKey) longestFile() (ret g.Option[int64]) {
-       t := me.t
-       firstPiece := t.pieceIndexOfRequestIndex(me.startRequest)
-       firstFileIndex := t.piece(firstPiece).beginFile
-       endFileIndex := t.piece(me.endPieceIndex() - 1).endFile
-       for fileIndex := firstFileIndex; fileIndex < endFileIndex; fileIndex++ {
-               fileLength := t.getFile(fileIndex).length
-               if ret.Ok {
-                       ret.Value = max(ret.Value, fileLength)
-               } else {
-                       ret.Set(fileLength)
-               }
-       }
-       return
-}
-
-func (me webseedUniqueRequestKey) String() string {
-       return fmt.Sprintf(
-               "%v at %v:%v",
-               me.aprioriWebseedRequestKey,
-               me.sliceIndex,
-               me.startRequest%me.t.chunksPerAlignedWebseedResponse(),
-       )
 }
 
 // Non-distinct proposed webseed request data.
 type webseedRequestOrderValue struct {
-       // The associated webseed request per host limit.
-       costKey webseedHostKeyHandle
+       aprioriMapValue
        // Used for cancellation if this is deprioritized. Also, a faster way to sort for existing
        // requests.
        existingWebseedRequest *webseedRequest
-       priority               PiecePriority
 }
 
 func (me webseedRequestOrderValue) String() string {
@@ -358,8 +359,8 @@ func (me webseedRequestOrderValue) String() string {
 }
 
 // Yields possible webseed requests by piece. Caller should filter and prioritize these.
-func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
-       return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, aprioriMapValue] {
+       return func(yield func(webseedUniqueRequestKey, aprioriMapValue) bool) {
                for key, value := range cl.pieceRequestOrder {
                        input := key.getRequestStrategyInput(cl)
                        requestStrategy.GetRequestablePieces(
@@ -391,11 +392,11 @@ func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKe
                                                                        sliceIndex: webseedSliceIndex,
                                                                        url:        url,
                                                                },
-                                                               firstRequest,
                                                        },
-                                                       webseedRequestOrderValue{
-                                                               priority: priority,
-                                                               costKey:  ws.hostKey,
+                                                       aprioriMapValue{
+                                                               priority:     priority,
+                                                               costKey:      ws.hostKey,
+                                                               startRequest: firstRequest,
                                                        },
                                                ) {
                                                        return false
@@ -416,6 +417,7 @@ func (cl *Client) updateWebseedRequestsWithReason(reason updateRequestReason) {
 
 func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
        return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+               // TODO: This entire thing can be a single map on Client ("active webseed requests").
                for t := range cl.torrents {
                        for url, ws := range t.webSeeds {
                                for ar := range ws.activeRequests {
@@ -437,12 +439,14 @@ func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey
                                                                sliceIndex: ar.next / t.chunksPerAlignedWebseedResponse(),
                                                                url:        url,
                                                        },
-                                                       ar.next,
                                                },
                                                webseedRequestOrderValue{
-                                                       priority:               p.effectivePriority(),
-                                                       existingWebseedRequest: ar,
-                                                       costKey:                ws.hostKey,
+                                                       aprioriMapValue{
+                                                               priority:     p.effectivePriority(),
+                                                               costKey:      ws.hostKey,
+                                                               startRequest: ar.next,
+                                                       },
+                                                       ar,
                                                },
                                        ) {
                                                return