From: Matt Joiner Date: Fri, 22 Aug 2025 04:51:51 +0000 (+1000) Subject: Compare Client and per-Torrent active webseed maps and reuse large allocations X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=daf1bae5ed80962f40ffd70a797257fc0e0de2fc;p=btrtrc.git Compare Client and per-Torrent active webseed maps and reuse large allocations --- diff --git a/client.go b/client.go index 99494b2c..11cdedc0 100644 --- a/client.go +++ b/client.go @@ -113,11 +113,17 @@ type Client struct { upnpMappings []*upnpMapping + clientWebseedState + + activePieceHashers int +} + +type clientWebseedState struct { webseedRequestTimer *time.Timer webseedUpdateReason updateRequestReason activeWebseedRequests map[webseedUniqueRequestKey]*webseedRequest - - activePieceHashers int + aprioriMap map[webseedUniqueRequestKey]aprioriMapValue + heapSlice []webseedRequestHeapElem } type ipStr string diff --git a/webseed-requesting.go b/webseed-requesting.go index 81166b5e..4a17b79d 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -10,6 +10,7 @@ import ( "maps" "os" "runtime/pprof" + "slices" "strings" "sync" "time" @@ -42,6 +43,13 @@ func (me webseedUrlKey) String() string { return me.Value() } +type webseedRequestHeapElem struct { + webseedUniqueRequestKey + webseedRequestOrderValue + // Not sure this is even worth it now. + mightHavePartialFiles bool +} + /* - Go through all the requestable pieces in order of priority, availability, whether there are peer requests, partial, infohash. - For each piece calculate files involved. Record each file not seen before and the piece index. @@ -49,8 +57,16 @@ func (me webseedUrlKey) String() string { - Initiate missing requests that fit into the available limits. */ func (cl *Client) updateWebseedRequests() { - aprioriMap := make(map[webseedUniqueRequestKey]aprioriMapValue) + existingRequests := maps.Collect(cl.iterCurrentWebseedRequestsFromClient()) + panicif.False(maps.Equal(existingRequests, maps.Collect(cl.iterCurrentWebseedRequests()))) + + g.MakeMapIfNil(&cl.aprioriMap) + aprioriMap := cl.aprioriMap + clear(aprioriMap) for uniqueKey, value := range cl.iterPossibleWebseedRequests() { + if g.MapContains(existingRequests, uniqueKey) { + continue + } cur, ok := aprioriMap[uniqueKey] if ok { // Shared in the lookup above. @@ -72,34 +88,23 @@ func (cl *Client) updateWebseedRequests() { } aprioriMap[uniqueKey] = value } - // This includes startRequest in the key. This means multiple webseed requests can exist in the - // same webseed request slice. - existingRequests := make(map[webseedUniqueRequestKey]webseedRequestOrderValue) - existingRequestCount := 0 - // TODO: Maintain a cache of active requests in the Client, and use them to filter proposed - // requests (same result but less allocations). - for key, value := range cl.iterCurrentWebseedRequests() { - existingRequests[key] = value - existingRequestCount++ - } - // Check "active" current webseed request cardinality matches expectation. - panicif.NotEq(len(existingRequests), existingRequestCount) - // We don't need the value but maybe cloning is just faster anyway? - unusedExistingRequests := maps.Clone(existingRequests) - type heapElem struct { - webseedUniqueRequestKey - webseedRequestOrderValue - // Not sure this is even worth it now. - mightHavePartialFiles bool - } + // Build the request heap, merging existing requests if they match. - heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests)) + + heapSlice := cl.heapSlice[:0] + requiredCap := len(aprioriMap) + len(existingRequests) + if cap(heapSlice) < requiredCap { + heapSlice = slices.Grow(heapSlice, requiredCap-cap(heapSlice)) + } + defer func() { + // Will this let GC collect values? If not do we need to clear? :( + cl.heapSlice = heapSlice[:0] + }() + for key, value := range aprioriMap { - if g.MapContains(existingRequests, key) { - // Prefer the existing request always - continue - } - heapSlice = append(heapSlice, heapElem{ + // Should be filtered earlier. + panicif.True(g.MapContains(existingRequests, key)) + heapSlice = append(heapSlice, webseedRequestHeapElem{ key, webseedRequestOrderValue{ aprioriMapValue: value, @@ -111,13 +116,13 @@ func (cl *Client) updateWebseedRequests() { }) } // Add remaining existing requests. - for key, value := range unusedExistingRequests { + for key, value := range existingRequests { // Don't reconsider existing requests that aren't wanted anymore. if key.t.dataDownloadDisallowed.IsSet() { continue } wr := value.existingWebseedRequest - heapSlice = append(heapSlice, heapElem{ + heapSlice = append(heapSlice, webseedRequestHeapElem{ key, existingRequests[key], key.t.filesInRequestRangeMightBePartial(wr.next, wr.end), @@ -125,7 +130,7 @@ func (cl *Client) updateWebseedRequests() { } aprioriHeap := heap.InterfaceForSlice( &heapSlice, - func(l heapElem, r heapElem) bool { + func(l webseedRequestHeapElem, r webseedRequestHeapElem) bool { // Not stable ordering but being sticky to existing webseeds should be enough. ret := cmp.Or( // Prefer highest priority @@ -418,41 +423,68 @@ func (cl *Client) updateWebseedRequestsWithReason(reason updateRequestReason) { cl.scheduleImmediateWebseedRequestUpdate(reason) } +// This has awful naming, I'm not quite sure what to call this. +func (cl *Client) yieldKeyAndValue( + yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool, + key webseedUniqueRequestKey, + ar *webseedRequest, +) bool { + t := key.t + url := key.url + hostKey := t.webSeeds[url].hostKey + if ar.next >= ar.end { + // This request is done, so don't yield it. + return true + } + // Don't spawn requests before old requests are cancelled. + if false { + if ar.cancelled.Load() { + cl.slogger.Debug("iter current webseed requests: skipped cancelled webseed request") + // This should prevent overlapping webseed requests that are just filling + // slots waiting to cancel from conflicting. + return true + } + } + p := t.piece(t.pieceIndexOfRequestIndex(ar.next)) + return yield( + webseedUniqueRequestKey{ + t: t, + sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next), + url: url, + }, + webseedRequestOrderValue{ + aprioriMapValue{ + priority: p.effectivePriority(), + costKey: hostKey, + startRequest: ar.next, + }, + ar, + }, + ) +} + +func (cl *Client) iterCurrentWebseedRequestsFromClient() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { + return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { + for key, ar := range cl.activeWebseedRequests { + if !cl.yieldKeyAndValue(yield, key, ar) { + return + } + } + } +} + +// This exists to compare old behaviour with Client active requests. func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { - // TODO: This entire thing can be a single map on Client ("active webseed requests"). for t := range cl.torrents { for url, ws := range t.webSeeds { for ar := range ws.activeRequests { - if ar.next >= ar.end { - // This request is done, so don't yield it. - continue - } - // Don't spawn requests before old requests are cancelled. - if false { - if ar.cancelled.Load() { - cl.slogger.Debug("iter current webseed requests: skipped cancelled webseed request") - // This should prevent overlapping webseed requests that are just filling - // slots waiting to cancel from conflicting. - continue - } + key := webseedUniqueRequestKey{ + t: t, + sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next), + url: url, } - p := t.piece(t.pieceIndexOfRequestIndex(ar.next)) - if !yield( - webseedUniqueRequestKey{ - t: t, - sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next), - url: url, - }, - webseedRequestOrderValue{ - aprioriMapValue{ - priority: p.effectivePriority(), - costKey: ws.hostKey, - startRequest: ar.next, - }, - ar, - }, - ) { + if !cl.yieldKeyAndValue(yield, key, ar) { return } }