]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Compare Client and per-Torrent active webseed maps and reuse large allocations
authorMatt Joiner <anacrolix@gmail.com>
Fri, 22 Aug 2025 04:51:51 +0000 (14:51 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 22 Aug 2025 04:51:51 +0000 (14:51 +1000)
client.go
webseed-requesting.go

index 99494b2ce5a4fb969c0de5443d0dc64c89d604e2..11cdedc036933dbbf57c09608c4fdd65022a7019 100644 (file)
--- a/client.go
+++ b/client.go
@@ -113,11 +113,17 @@ type Client struct {
 
        upnpMappings []*upnpMapping
 
+       clientWebseedState
+
+       activePieceHashers int
+}
+
+type clientWebseedState struct {
        webseedRequestTimer   *time.Timer
        webseedUpdateReason   updateRequestReason
        activeWebseedRequests map[webseedUniqueRequestKey]*webseedRequest
-
-       activePieceHashers int
+       aprioriMap            map[webseedUniqueRequestKey]aprioriMapValue
+       heapSlice             []webseedRequestHeapElem
 }
 
 type ipStr string
index 81166b5ee9867b955c4d5a6d4577437f43cb7c42..4a17b79deb389b18f34b287d76dab0c17abe04f0 100644 (file)
@@ -10,6 +10,7 @@ import (
        "maps"
        "os"
        "runtime/pprof"
+       "slices"
        "strings"
        "sync"
        "time"
@@ -42,6 +43,13 @@ func (me webseedUrlKey) String() string {
        return me.Value()
 }
 
+type webseedRequestHeapElem struct {
+       webseedUniqueRequestKey
+       webseedRequestOrderValue
+       // Not sure this is even worth it now.
+       mightHavePartialFiles bool
+}
+
 /*
 - Go through all the requestable pieces in order of priority, availability, whether there are peer requests, partial, infohash.
 - For each piece calculate files involved. Record each file not seen before and the piece index.
@@ -49,8 +57,16 @@ func (me webseedUrlKey) String() string {
 - Initiate missing requests that fit into the available limits.
 */
 func (cl *Client) updateWebseedRequests() {
-       aprioriMap := make(map[webseedUniqueRequestKey]aprioriMapValue)
+       existingRequests := maps.Collect(cl.iterCurrentWebseedRequestsFromClient())
+       panicif.False(maps.Equal(existingRequests, maps.Collect(cl.iterCurrentWebseedRequests())))
+
+       g.MakeMapIfNil(&cl.aprioriMap)
+       aprioriMap := cl.aprioriMap
+       clear(aprioriMap)
        for uniqueKey, value := range cl.iterPossibleWebseedRequests() {
+               if g.MapContains(existingRequests, uniqueKey) {
+                       continue
+               }
                cur, ok := aprioriMap[uniqueKey]
                if ok {
                        // Shared in the lookup above.
@@ -72,34 +88,23 @@ func (cl *Client) updateWebseedRequests() {
                }
                aprioriMap[uniqueKey] = value
        }
-       // This includes startRequest in the key. This means multiple webseed requests can exist in the
-       // same webseed request slice.
-       existingRequests := make(map[webseedUniqueRequestKey]webseedRequestOrderValue)
-       existingRequestCount := 0
-       // TODO: Maintain a cache of active requests in the Client, and use them to filter proposed
-       // requests (same result but less allocations).
-       for key, value := range cl.iterCurrentWebseedRequests() {
-               existingRequests[key] = value
-               existingRequestCount++
-       }
-       // Check "active" current webseed request cardinality matches expectation.
-       panicif.NotEq(len(existingRequests), existingRequestCount)
-       // We don't need the value but maybe cloning is just faster anyway?
-       unusedExistingRequests := maps.Clone(existingRequests)
-       type heapElem struct {
-               webseedUniqueRequestKey
-               webseedRequestOrderValue
-               // Not sure this is even worth it now.
-               mightHavePartialFiles bool
-       }
+
        // Build the request heap, merging existing requests if they match.
-       heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests))
+
+       heapSlice := cl.heapSlice[:0]
+       requiredCap := len(aprioriMap) + len(existingRequests)
+       if cap(heapSlice) < requiredCap {
+               heapSlice = slices.Grow(heapSlice, requiredCap-cap(heapSlice))
+       }
+       defer func() {
+               // Will this let GC collect values? If not do we need to clear? :(
+               cl.heapSlice = heapSlice[:0]
+       }()
+
        for key, value := range aprioriMap {
-               if g.MapContains(existingRequests, key) {
-                       // Prefer the existing request always
-                       continue
-               }
-               heapSlice = append(heapSlice, heapElem{
+               // Should be filtered earlier.
+               panicif.True(g.MapContains(existingRequests, key))
+               heapSlice = append(heapSlice, webseedRequestHeapElem{
                        key,
                        webseedRequestOrderValue{
                                aprioriMapValue: value,
@@ -111,13 +116,13 @@ func (cl *Client) updateWebseedRequests() {
                })
        }
        // Add remaining existing requests.
-       for key, value := range unusedExistingRequests {
+       for key, value := range existingRequests {
                // Don't reconsider existing requests that aren't wanted anymore.
                if key.t.dataDownloadDisallowed.IsSet() {
                        continue
                }
                wr := value.existingWebseedRequest
-               heapSlice = append(heapSlice, heapElem{
+               heapSlice = append(heapSlice, webseedRequestHeapElem{
                        key,
                        existingRequests[key],
                        key.t.filesInRequestRangeMightBePartial(wr.next, wr.end),
@@ -125,7 +130,7 @@ func (cl *Client) updateWebseedRequests() {
        }
        aprioriHeap := heap.InterfaceForSlice(
                &heapSlice,
-               func(l heapElem, r heapElem) bool {
+               func(l webseedRequestHeapElem, r webseedRequestHeapElem) bool {
                        // Not stable ordering but being sticky to existing webseeds should be enough.
                        ret := cmp.Or(
                                // Prefer highest priority
@@ -418,41 +423,68 @@ func (cl *Client) updateWebseedRequestsWithReason(reason updateRequestReason) {
        cl.scheduleImmediateWebseedRequestUpdate(reason)
 }
 
+// This has awful naming, I'm not quite sure what to call this.
+func (cl *Client) yieldKeyAndValue(
+       yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool,
+       key webseedUniqueRequestKey,
+       ar *webseedRequest,
+) bool {
+       t := key.t
+       url := key.url
+       hostKey := t.webSeeds[url].hostKey
+       if ar.next >= ar.end {
+               // This request is done, so don't yield it.
+               return true
+       }
+       // Don't spawn requests before old requests are cancelled.
+       if false {
+               if ar.cancelled.Load() {
+                       cl.slogger.Debug("iter current webseed requests: skipped cancelled webseed request")
+                       // This should prevent overlapping webseed requests that are just filling
+                       // slots waiting to cancel from conflicting.
+                       return true
+               }
+       }
+       p := t.piece(t.pieceIndexOfRequestIndex(ar.next))
+       return yield(
+               webseedUniqueRequestKey{
+                       t:          t,
+                       sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next),
+                       url:        url,
+               },
+               webseedRequestOrderValue{
+                       aprioriMapValue{
+                               priority:     p.effectivePriority(),
+                               costKey:      hostKey,
+                               startRequest: ar.next,
+                       },
+                       ar,
+               },
+       )
+}
+
+func (cl *Client) iterCurrentWebseedRequestsFromClient() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
+       return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+               for key, ar := range cl.activeWebseedRequests {
+                       if !cl.yieldKeyAndValue(yield, key, ar) {
+                               return
+                       }
+               }
+       }
+}
+
+// This exists to compare old behaviour with Client active requests.
 func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
        return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
-               // TODO: This entire thing can be a single map on Client ("active webseed requests").
                for t := range cl.torrents {
                        for url, ws := range t.webSeeds {
                                for ar := range ws.activeRequests {
-                                       if ar.next >= ar.end {
-                                               // This request is done, so don't yield it.
-                                               continue
-                                       }
-                                       // Don't spawn requests before old requests are cancelled.
-                                       if false {
-                                               if ar.cancelled.Load() {
-                                                       cl.slogger.Debug("iter current webseed requests: skipped cancelled webseed request")
-                                                       // This should prevent overlapping webseed requests that are just filling
-                                                       // slots waiting to cancel from conflicting.
-                                                       continue
-                                               }
+                                       key := webseedUniqueRequestKey{
+                                               t:          t,
+                                               sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next),
+                                               url:        url,
                                        }
-                                       p := t.piece(t.pieceIndexOfRequestIndex(ar.next))
-                                       if !yield(
-                                               webseedUniqueRequestKey{
-                                                       t:          t,
-                                                       sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next),
-                                                       url:        url,
-                                               },
-                                               webseedRequestOrderValue{
-                                                       aprioriMapValue{
-                                                               priority:     p.effectivePriority(),
-                                                               costKey:      ws.hostKey,
-                                                               startRequest: ar.next,
-                                                       },
-                                                       ar,
-                                               },
-                                       ) {
+                                       if !cl.yieldKeyAndValue(yield, key, ar) {
                                                return
                                        }
                                }