]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Tons of webseed optimizations
authorMatt Joiner <anacrolix@gmail.com>
Fri, 1 Aug 2025 13:51:28 +0000 (23:51 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 1 Aug 2025 13:51:46 +0000 (23:51 +1000)
Avoid extra allocation in firstDirtyChunk
Allow multiple requests per file (now called slices or webseed chunks)
Reduce many allocations and extra work in webseed heap ordering
Add webseed request update pprof labels
Use a larger buffer for copying webseed response parts

client.go
math.go
piece.go
torrent.go
webseed-peer.go
webseed-requesting.go
webseed/client.go

index e4daa4609cfd589662c2f215f5737c178e959ab0..4c3c6dc276f2a5bb152709f57408a4fb7a19f7b2 100644 (file)
--- a/client.go
+++ b/client.go
@@ -21,6 +21,7 @@ import (
        "strconv"
        "time"
 
+       "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/chansync"
        "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/dht/v2"
@@ -52,7 +53,7 @@ import (
        "github.com/anacrolix/torrent/webtorrent"
 )
 
-const webseedRequestUpdateTimerInterval = time.Second
+const webseedRequestUpdateTimerInterval = 5 * time.Second
 
 // Clients contain zero or more Torrents. A Client manages a blocklist, the
 // TCP/UDP protocol ports, and DHT as desired.
@@ -62,9 +63,11 @@ type Client struct {
        connStats AllConnStats
        counters  TorrentStatCounters
 
-       _mu    lockWithDeferreds
-       event  sync.Cond
-       closed chansync.SetOnce
+       _mu lockWithDeferreds
+       // Used in constrained situations when the lock is held.
+       roaringIntIterator roaring.IntIterator
+       event              sync.Cond
+       closed             chansync.SetOnce
 
        config  *ClientConfig
        logger  log.Logger
@@ -111,6 +114,7 @@ type Client struct {
        upnpMappings []*upnpMapping
 
        webseedRequestTimer *time.Timer
+       webseedUpdateReason updateRequestReason
 
        activePieceHashers int
 }
diff --git a/math.go b/math.go
index 0aefb4f7b64c682bfd86df2e152a2d149db2549a..0594fd29bae897c299b169721bfbacdf882e35d5 100644 (file)
--- a/math.go
+++ b/math.go
@@ -4,8 +4,13 @@ import (
        "golang.org/x/exp/constraints"
 )
 
+// a/b rounding up
 func intCeilDiv[T constraints.Unsigned](a, b T) T {
        // This still sux for negative numbers due to truncating division. But I don't know that we need
        // or that ceil division makes sense for negative numbers.
        return (a + b - 1) / b
 }
+
+func nextMultiple[T constraints.Integer](x, multiple T) T {
+       return x + multiple - x%multiple
+}
index c95c2fe4a877f0206c2b317a15ef31ba25ceeacf..9030d6a5650b136f654e7b32bf1714d6f0d8315a 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -108,7 +108,8 @@ func (p *Piece) numDirtyChunks() chunkIndexType {
        return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex](
                &p.t.dirtyChunks,
                p.requestIndexBegin(),
-               p.requestIndexMaxEnd()))
+               p.t.pieceRequestIndexBegin(p.index+1),
+       ))
 }
 
 func (p *Piece) unpendChunkIndex(i chunkIndexType) {
@@ -156,9 +157,8 @@ func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool {
        return p.t.dirtyChunks.Contains(p.requestIndexBegin() + chunk)
 }
 
-func (p *Piece) iterCleanChunks() iter.Seq[chunkIndexType] {
+func (p *Piece) iterCleanChunks(it *roaring.IntIterator) iter.Seq[chunkIndexType] {
        return func(yield func(chunkIndexType) bool) {
-               var it roaring.IntIterator
                it.Initialize(&p.t.dirtyChunks.Bitmap)
                begin := uint32(p.requestIndexBegin())
                end := uint32(p.requestIndexMaxEnd())
@@ -175,7 +175,7 @@ func (p *Piece) iterCleanChunks() iter.Seq[chunkIndexType] {
 }
 
 func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) {
-       for some := range p.iterCleanChunks() {
+       for some := range p.iterCleanChunks(&p.t.cl.roaringIntIterator) {
                return g.Some(some)
        }
        return
@@ -342,7 +342,12 @@ func (p *Piece) requestIndexBegin() RequestIndex {
 // The maximum end request index for the piece. Some of the requests might not be valid, it's for
 // cleaning up arrays and bitmaps in broad strokes.
 func (p *Piece) requestIndexMaxEnd() RequestIndex {
-       return p.t.pieceRequestIndexBegin(p.index + 1)
+       new := min(p.t.pieceRequestIndexBegin(p.index+1), p.t.maxEndRequest())
+       if false {
+               old := p.t.pieceRequestIndexBegin(p.index + 1)
+               panicif.NotEq(new, old)
+       }
+       return new
 }
 
 func (p *Piece) availability() int {
index 0af3a32f50561a90c3de2b5c9531a193afdd4e17..edd86654d48eec1685a5c46e2e3eb59fa5e67dbf 100644 (file)
@@ -866,6 +866,8 @@ func (t *Torrent) writeStatus(w io.Writer) {
                }
                fmt.Fprintln(w)
        }
+       // Note this might be shared with other torrents.
+       fmt.Fprintf(w, "Piece request order length: %v\n", t.getPieceRequestOrder().Len())
        fmt.Fprintf(w, "Piece length: %s\n",
                func() string {
                        if t.haveInfo() {
@@ -1161,7 +1163,7 @@ func (t *Torrent) bitfield() (bf []bool) {
 }
 
 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
-       return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
+       return chunkIndexType(intCeilDiv(t.pieceLength(piece), t.chunkSize))
 }
 
 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
@@ -3552,8 +3554,27 @@ func (t *Torrent) getFile(fileIndex int) *File {
 
 func (t *Torrent) fileMightBePartial(fileIndex int) bool {
        f := t.getFile(fileIndex)
-       beginPieceIndex := f.BeginPieceIndex()
-       endPieceIndex := f.EndPieceIndex()
+       return t.piecesMightBePartial(f.BeginPieceIndex(), f.EndPieceIndex())
+}
+
+func (t *Torrent) expandPieceRangeToFullFiles(beginPieceIndex, endPieceIndex pieceIndex) (expandedBegin, expandedEnd pieceIndex) {
+       // Expand the piece range to include all pieces of the files in the original range.
+       firstFile := t.getFile(t.piece(beginPieceIndex).beginFile)
+       lastFile := t.getFile(t.piece(endPieceIndex-1).endFile - 1)
+       expandedBegin = firstFile.BeginPieceIndex()
+       expandedEnd = lastFile.EndPieceIndex()
+       return
+}
+
+// Pieces in the range [begin, end) may have partially complete files. Note we only check for dirty chunks and either all or no pieces being complete.
+func (t *Torrent) filesInPieceRangeMightBePartial(begin, end pieceIndex) bool {
+       begin, end = t.expandPieceRangeToFullFiles(begin, end)
+       return t.piecesMightBePartial(begin, end)
+}
+
+// Pieces in the range [begin, end) are dirty, or in a mixed completion state.
+func (t *Torrent) piecesMightBePartial(beginPieceIndex, endPieceIndex int) bool {
+       // Check for dirtied chunks.
        if t.dirtyChunks.IntersectsWithInterval(
                uint64(t.pieceRequestIndexBegin(beginPieceIndex)),
                uint64(t.pieceRequestIndexBegin(endPieceIndex)),
@@ -3562,6 +3583,7 @@ func (t *Torrent) fileMightBePartial(fileIndex int) bool {
                // been started.
                return true
        }
+       // Check for mixed completion.
        var r roaring.Bitmap
        r.AddRange(uint64(beginPieceIndex), uint64(endPieceIndex))
        switch t._completedPieces.AndCardinality(&r) {
@@ -3619,3 +3641,9 @@ func (t *Torrent) incrementPiecesDirtiedStats(p *Piece, inc func(stats *ConnStat
        panicif.GreaterThan(len(distinctUpstreamConnStats), 6)
        maps.Keys(distinctUpstreamConnStats)(inc)
 }
+
+// Maximum end request index for the torrent (one past the last). There might be other requests that
+// don't make sense if padding files and v2 are in use.
+func (t *Torrent) maxEndRequest() RequestIndex {
+       return RequestIndex(intCeilDiv(uint64(t.length()), t.chunkSize.Uint64()))
+}
index 6026d9c7c3c92ac9a56061250f5bc88b7c6be043..3a407d5fdb07c6ea6098099007c4d0a1d59d3d6c 100644 (file)
@@ -53,8 +53,8 @@ func (me *webseedPeer) isLowOnRequests() bool {
 }
 
 // Webseed requests are issued globally so per-connection reasons or handling make no sense.
-func (me *webseedPeer) onNeedUpdateRequests(updateRequestReason) {
-       me.peer.cl.scheduleImmediateWebseedRequestUpdate()
+func (me *webseedPeer) onNeedUpdateRequests(reason updateRequestReason) {
+       me.peer.cl.scheduleImmediateWebseedRequestUpdate(reason)
 }
 
 func (me *webseedPeer) expectingChunks() bool {
@@ -214,10 +214,10 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) {
        locker.Lock()
        // Delete this entry after waiting above on an error, to prevent more requests.
        ws.deleteActiveRequest(webseedRequest)
-       if err != nil {
-               ws.peer.onNeedUpdateRequests("webseedPeer request errored")
+       cl := ws.peer.cl
+       if err == nil && cl.numWebSeedRequests[ws.hostKey] <= webseedHostRequestConcurrency/2 {
+               cl.updateWebseedRequestsWithReason("webseedPeer request completed")
        }
-       ws.peer.t.cl.updateWebseedRequestsWithReason("webseedPeer request completed")
        locker.Unlock()
 }
 
index c980e29681a87e68a8f853cd0959c8b631a844d9..b746850ba0a73de90e70c24068ed9b1b99fe7836 100644 (file)
@@ -2,14 +2,17 @@ package torrent
 
 import (
        "cmp"
+       "context"
        "fmt"
        "iter"
        "log/slog"
        "maps"
        "os"
+       "runtime/pprof"
        "strconv"
        "strings"
        "sync"
+       "time"
        "unique"
 
        g "github.com/anacrolix/generics"
@@ -18,8 +21,6 @@ import (
 
        "github.com/anacrolix/torrent/internal/request-strategy"
        "github.com/anacrolix/torrent/metainfo"
-       pp "github.com/anacrolix/torrent/peer_protocol"
-
        "github.com/anacrolix/torrent/webseed"
 )
 
@@ -44,9 +45,15 @@ type (
 - Initiate missing requests that fit into the available limits.
 */
 func (cl *Client) updateWebseedRequests() {
+       if webseed.PrintDebug {
+               started := time.Now()
+               defer func() {
+                       now := time.Now()
+                       fmt.Printf("%v: updateWebseedRequests took %v\n", time.Now(), now.Sub(started))
+               }()
+       }
        type aprioriMapValue struct {
-               // Change to request index?
-               startOffset int64
+               startIndex RequestIndex
                webseedRequestOrderValue
        }
        aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue)
@@ -55,8 +62,7 @@ func (cl *Client) updateWebseedRequests() {
                if ok {
                        // Shared in the lookup above.
                        t := uniqueKey.t
-                       hasPeerConnRequest := func(offset int64) bool {
-                               reqIndex := t.getRequestIndexContainingOffset(offset)
+                       hasPeerConnRequest := func(reqIndex RequestIndex) bool {
                                return t.requestingPeer(reqIndex) != nil
                        }
                        // Skip the webseed request unless it has a higher priority, is less requested by peer
@@ -64,13 +70,13 @@ func (cl *Client) updateWebseedRequests() {
                        // webseed requests in favour of peer conns unless there's nothing else to do.
                        if cmp.Or(
                                cmp.Compare(value.priority, cur.priority),
-                               compareBool(hasPeerConnRequest(cur.startOffset), hasPeerConnRequest(uniqueKey.startOffset)),
-                               cmp.Compare(cur.startOffset, uniqueKey.startOffset),
+                               compareBool(hasPeerConnRequest(cur.startIndex), hasPeerConnRequest(uniqueKey.startRequest)),
+                               cmp.Compare(cur.startIndex, uniqueKey.startRequest),
                        ) <= 0 {
                                continue
                        }
                }
-               aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value}
+               aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startRequest, value}
        }
        existingRequests := maps.Collect(cl.iterCurrentWebseedRequests())
        // We don't need the value but maybe cloning is just faster anyway?
@@ -78,11 +84,12 @@ func (cl *Client) updateWebseedRequests() {
        type heapElem struct {
                webseedUniqueRequestKey
                webseedRequestOrderValue
+               mightHavePartialFiles bool
        }
        // Build the request heap, merging existing requests if they match.
        heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests))
        for key, value := range aprioriMap {
-               fullKey := webseedUniqueRequestKey{key, value.startOffset}
+               fullKey := webseedUniqueRequestKey{key, value.startIndex}
                heapValue := value.webseedRequestOrderValue
                // If there's a matching existing request, make sure to include a reference to it in the
                // heap value and deduplicate it.
@@ -101,6 +108,7 @@ func (cl *Client) updateWebseedRequests() {
                heapSlice = append(heapSlice, heapElem{
                        fullKey,
                        heapValue,
+                       fullKey.mightHavePartialFiles(),
                })
        }
        // Add remaining existing requests.
@@ -109,11 +117,12 @@ func (cl *Client) updateWebseedRequests() {
                if key.t.dataDownloadDisallowed.IsSet() {
                        continue
                }
-               heapSlice = append(heapSlice, heapElem{key, existingRequests[key]})
+               heapSlice = append(heapSlice, heapElem{key, existingRequests[key], key.mightHavePartialFiles()})
        }
        aprioriHeap := heap.InterfaceForSlice(
                &heapSlice,
                func(l heapElem, r heapElem) bool {
+                       // Not stable ordering but being sticky to existing webseeds should be enough.
                        return cmp.Or(
                                // Prefer highest priority
                                -cmp.Compare(l.priority, r.priority),
@@ -121,13 +130,11 @@ func (cl *Client) updateWebseedRequests() {
                                compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil),
                                // Prefer not competing with active peer connections.
                                compareBool(len(l.t.conns) > 0, len(r.t.conns) > 0),
-                               // Try to complete partial files first.
-                               -compareBool(l.t.fileMightBePartial(l.fileIndex), r.t.fileMightBePartial(r.fileIndex)),
-                               // Note this isn't correct if the starting piece is split across multiple files. But
-                               // I plan to refactor to key on starting piece to handle this case.
-                               -cmp.Compare(
-                                       l.t.Files()[l.fileIndex].length,
-                                       r.t.Files()[r.fileIndex].length),
+                               // Try to complete partial slices first.
+                               -compareBool(l.mightHavePartialFiles, r.mightHavePartialFiles),
+                               // No need to prefer longer files anymore now that we're using slices?
+                               //// Longer files first.
+                               //-cmp.Compare(l.longestFile().Unwrap(), r.longestFile().Unwrap()),
                        ) < 0
                },
        )
@@ -170,6 +177,8 @@ func (cl *Client) updateWebseedRequests() {
                }
        })
 
+       // TODO: Do we deduplicate requests across different webseeds?
+
        for costKey, requestKeys := range plan.byCost {
                for _, requestKey := range requestKeys {
                        // This could happen if a request is cancelled but hasn't removed itself from the active
@@ -184,14 +193,14 @@ func (cl *Client) updateWebseedRequests() {
                        peer := t.webSeeds[requestKey.url]
                        panicif.NotEq(peer.hostKey, costKey)
                        printPlan()
-                       begin := t.getRequestIndexContainingOffset(requestKey.startOffset)
-                       fileEnd := t.endRequestIndexForFileIndex(requestKey.fileIndex)
+                       begin := requestKey.startRequest
+                       chunkEnd := t.endRequestForAlignedWebseedResponse(requestKey.startRequest)
                        last := begin
                        for {
                                if !t.wantReceiveChunk(last) {
                                        break
                                }
-                               if last >= fileEnd-1 {
+                               if last >= chunkEnd-1 {
                                        break
                                }
                                last++
@@ -201,40 +210,45 @@ func (cl *Client) updateWebseedRequests() {
                                AddSource: true,
                        })).With(
                                "webseedUrl", requestKey.url,
-                               "fileIndex", requestKey.fileIndex)
+                               "webseedChunkIndex", requestKey.sliceIndex)
                        // Request shouldn't exist if this occurs.
                        panicif.LessThan(last, begin)
-                       // Hello C++ my old friend.
+                       // Hello darkness (C++) my old friend...
                        end := last + 1
-                       truncateEndToCacheBoundary(begin, &end, t.chunkSize)
-                       if webseed.PrintDebug && end != fileEnd {
+                       end = min(end, t.endRequestForAlignedWebseedResponse(begin))
+                       panicif.LessThanOrEqual(end, begin)
+                       if webseed.PrintDebug && end != chunkEnd {
                                debugLogger.Debug(
                                        "shortened webseed request",
-                                       "first-file", requestKey.filePath(),
-                                       "from", endExclusiveString(begin, fileEnd),
+                                       "request key", requestKey,
+                                       "from", endExclusiveString(begin, chunkEnd),
                                        "to", endExclusiveString(begin, end))
                        }
-                       panicif.GreaterThan(end, fileEnd)
+                       panicif.GreaterThan(end, chunkEnd)
                        peer.spawnRequest(begin, end, debugLogger)
                }
        }
 }
 
-// Limit a webseed request end request index so that the required response body size fits within
-// cache limits for a WebSeed provider.
-func truncateEndToCacheBoundary(start RequestIndex, end *RequestIndex, chunkSize pp.Integer) {
-       // Cloudflare caches up to 512 MB responses by default.
-       const cacheResponseBodyLimit = 256 << 20
-       chunksPerAlignedResponse := RequestIndex(cacheResponseBodyLimit / chunkSize)
-       startIndex := start / chunksPerAlignedResponse
-       *end = min(*end, (startIndex+1)*chunksPerAlignedResponse)
+// Cloudflare caches up to 512 MB responses by default. This is also an alignment.
+var webseedRequestChunkSize uint64 = 256 << 20
+
+func (t *Torrent) endRequestForAlignedWebseedResponse(start RequestIndex) RequestIndex {
+       end := min(t.maxEndRequest(), nextMultiple(start, t.chunksPerAlignedWebseedResponse()))
+       panicif.LessThanOrEqual(end, start)
+       return end
+}
+
+func (t *Torrent) chunksPerAlignedWebseedResponse() RequestIndex {
+       // This is the same as webseedRequestChunkSize, but in terms of RequestIndex.
+       return RequestIndex(webseedRequestChunkSize / t.chunkSize.Uint64())
 }
 
 func (cl *Client) dumpCurrentWebseedRequests() {
        if webseed.PrintDebug {
                fmt.Println("current webseed requests:")
                for key, value := range cl.iterCurrentWebseedRequests() {
-                       fmt.Printf("\t%v: %v, priority %v\n", key.filePath(), value.existingWebseedRequest, value.priority)
+                       fmt.Printf("\t%v: %v, priority %v\n", key, value.existingWebseedRequest, value.priority)
                }
        }
 }
@@ -256,27 +270,56 @@ func (me webseedRequestPlan) String() string {
 
 // Distinct webseed request data when different offsets are not allowed.
 type aprioriWebseedRequestKey struct {
-       t         *Torrent
-       fileIndex int
-       url       webseedUrlKey
-}
-
-func (me *aprioriWebseedRequestKey) filePath() string {
-       return me.t.Files()[me.fileIndex].Path()
+       t          *Torrent
+       sliceIndex RequestIndex
+       url        webseedUrlKey
 }
 
 func (me *aprioriWebseedRequestKey) String() string {
-       return fmt.Sprintf("%v from %v", me.filePath(), me.url)
+       return fmt.Sprintf("slice %v from %v", me.sliceIndex, me.url)
 }
 
 // Distinct webseed request when different offsets to the same object are allowed.
 type webseedUniqueRequestKey struct {
        aprioriWebseedRequestKey
-       startOffset int64
+       startRequest RequestIndex
+}
+
+func (me webseedUniqueRequestKey) endPieceIndex() pieceIndex {
+       return pieceIndex(intCeilDiv(
+               me.t.endRequestForAlignedWebseedResponse(me.startRequest),
+               me.t.chunksPerRegularPiece()))
+}
+
+func (me webseedUniqueRequestKey) mightHavePartialFiles() bool {
+       return me.t.filesInPieceRangeMightBePartial(
+               me.t.pieceIndexOfRequestIndex(me.startRequest),
+               me.endPieceIndex())
+}
+
+func (me webseedUniqueRequestKey) longestFile() (ret g.Option[int64]) {
+       t := me.t
+       firstPiece := t.pieceIndexOfRequestIndex(me.startRequest)
+       firstFileIndex := t.piece(firstPiece).beginFile
+       endFileIndex := t.piece(me.endPieceIndex() - 1).endFile
+       for fileIndex := firstFileIndex; fileIndex < endFileIndex; fileIndex++ {
+               fileLength := t.getFile(fileIndex).length
+               if ret.Ok {
+                       ret.Value = max(ret.Value, fileLength)
+               } else {
+                       ret.Set(fileLength)
+               }
+       }
+       return
 }
 
 func (me webseedUniqueRequestKey) String() string {
-       return me.aprioriWebseedRequestKey.String() + " at " + fmt.Sprintf("0x%x", me.startOffset)
+       return fmt.Sprintf(
+               "%v at %v:%v",
+               me.aprioriWebseedRequestKey,
+               me.sliceIndex,
+               me.startRequest%me.t.chunksPerAlignedWebseedResponse(),
+       )
 }
 
 // Non-distinct proposed webseed request data.
@@ -303,6 +346,9 @@ func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKe
                                value.pieces,
                                func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy.PieceRequestOrderState) bool {
                                        t := cl.torrentsByShortHash[ih]
+                                       if len(t.webSeeds) == 0 {
+                                               return false
+                                       }
                                        p := t.piece(pieceIndex)
                                        cleanOpt := p.firstCleanChunk()
                                        if !cleanOpt.Ok {
@@ -313,25 +359,25 @@ func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKe
                                        // client piece request order and ignores other states like hashing, marking
                                        // etc. Order state priority would be faster otherwise.
                                        priority := p.effectivePriority()
-                                       for i, e := range p.fileExtents(int64(cleanOpt.Value) * int64(t.chunkSize)) {
-                                               for url, ws := range t.webSeeds {
-                                                       // Return value from this function (RequestPieceFunc) doesn't terminate
-                                                       // iteration, so propagate that to not handling the yield return value.
-                                                       yield(
-                                                               webseedUniqueRequestKey{
-                                                                       aprioriWebseedRequestKey{
-                                                                               t:         t,
-                                                                               fileIndex: i,
-                                                                               url:       url,
-                                                                       },
-                                                                       e.Start,
+                                       firstRequest := p.requestIndexBegin() + cleanOpt.Value
+                                       webseedSliceIndex := firstRequest / t.chunksPerAlignedWebseedResponse()
+                                       for url, ws := range t.webSeeds {
+                                               // Return value from this function (RequestPieceFunc) doesn't terminate
+                                               // iteration, so propagate that to not handling the yield return value.
+                                               yield(
+                                                       webseedUniqueRequestKey{
+                                                               aprioriWebseedRequestKey{
+                                                                       t:          t,
+                                                                       sliceIndex: webseedSliceIndex,
+                                                                       url:        url,
                                                                },
-                                                               webseedRequestOrderValue{
-                                                                       priority: priority,
-                                                                       costKey:  ws.hostKey,
-                                                               },
-                                                       )
-                                               }
+                                                               firstRequest,
+                                                       },
+                                                       webseedRequestOrderValue{
+                                                               priority: priority,
+                                                               costKey:  ws.hostKey,
+                                                       },
+                                               )
                                        }
                                        // Pieces iterated here are only to select webseed requests. There's no guarantee they're chosen.
                                        return false
@@ -344,7 +390,7 @@ func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKe
 
 func (cl *Client) updateWebseedRequestsWithReason(reason updateRequestReason) {
        // Should we wrap this with pprof labels?
-       cl.scheduleImmediateWebseedRequestUpdate()
+       cl.scheduleImmediateWebseedRequestUpdate(reason)
 }
 
 func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
@@ -356,20 +402,15 @@ func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey
                                                // This request is done, so don't yield it.
                                                continue
                                        }
-                                       off := t.requestIndexBegin(ar.next)
-                                       opt := t.fileSegmentsIndex.Unwrap().LocateOffset(off)
-                                       if !opt.Ok {
-                                               continue
-                                       }
-                                       p := t.pieceForOffset(off)
+                                       p := t.piece(t.pieceIndexOfRequestIndex(ar.next))
                                        if !yield(
                                                webseedUniqueRequestKey{
                                                        aprioriWebseedRequestKey{
-                                                               t:         t,
-                                                               fileIndex: opt.Value.Index,
-                                                               url:       url,
+                                                               t:          t,
+                                                               sliceIndex: ar.next / t.chunksPerAlignedWebseedResponse(),
+                                                               url:        url,
                                                        },
-                                                       opt.Value.Offset,
+                                                       ar.next,
                                                },
                                                webseedRequestOrderValue{
                                                        priority:               p.effectivePriority(),
@@ -385,7 +426,7 @@ func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey
        }
 }
 
-func (cl *Client) scheduleImmediateWebseedRequestUpdate() {
+func (cl *Client) scheduleImmediateWebseedRequestUpdate(reason updateRequestReason) {
        if !cl.webseedRequestTimer.Stop() {
                // Timer function already running, let it do its thing.
                return
@@ -394,6 +435,7 @@ func (cl *Client) scheduleImmediateWebseedRequestUpdate() {
        // update on every call to this method). Since we're holding the Client lock, and we cancelled
        // the timer, and it wasn't active, nobody else should have reset it before us. Do we need to
        // introduce a "reason" field here, (albeit Client-level?).
+       cl.webseedUpdateReason = cmp.Or(cl.webseedUpdateReason, reason)
        panicif.True(cl.webseedRequestTimer.Reset(0))
 }
 
@@ -401,14 +443,22 @@ func (cl *Client) updateWebseedRequestsTimerFunc() {
        if cl.closed.IsSet() {
                return
        }
+       // This won't get set elsewhere if the timer has fired, which it has for us to be here.
+       cl.webseedUpdateReason = cmp.Or(cl.webseedUpdateReason, "timer")
        cl.lock()
        defer cl.unlock()
        cl.updateWebseedRequestsAndResetTimer()
 }
 
 func (cl *Client) updateWebseedRequestsAndResetTimer() {
-       cl.updateWebseedRequests()
-       // Timer should always be stopped before the last call.
+       pprof.Do(context.Background(), pprof.Labels(
+               "reason", string(cl.webseedUpdateReason),
+       ), func(_ context.Context) {
+               cl.updateWebseedRequests()
+               cl.webseedUpdateReason = ""
+       })
+       // Timer should always be stopped before the last call. TODO: Don't reset timer if there's
+       // nothing to do (no possible requests in update).
        panicif.True(cl.webseedRequestTimer.Reset(webseedRequestUpdateTimerInterval))
 
 }
index cd49934e4b2d278864d102c8d6c7a6d83ccbe08f..46488d3d3fec1a4cc4593d3b3e050e004bf1c8bd 100644 (file)
@@ -10,8 +10,10 @@ import (
        "net/http"
        "os"
        "strings"
+       "sync"
 
        "github.com/RoaringBitmap/roaring"
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/missinggo/v2/panicif"
        "github.com/dustin/go-humanize"
        "golang.org/x/time/rate"
@@ -175,6 +177,10 @@ func (me *Client) checkContentLength(resp *http.Response, part requestPart, expe
        }
 }
 
+var bufPool = &sync.Pool{New: func() any {
+       return g.PtrTo(make([]byte, 128<<10)) // 128 KiB. 4x the default.
+}}
+
 // Reads the part in full. All expected bytes must be returned or there will an error returned.
 func (me *Client) recvPartResult(ctx context.Context, w io.Writer, part requestPart, resp *http.Response) error {
        defer resp.Body.Close()
@@ -191,7 +197,9 @@ func (me *Client) recvPartResult(ctx context.Context, w io.Writer, part requestP
        case http.StatusPartialContent:
                // The response should be just as long as we requested.
                me.checkContentLength(resp, part, part.e.Length)
-               copied, err := io.Copy(w, body)
+               buf := bufPool.Get().(*[]byte)
+               defer bufPool.Put(buf)
+               copied, err := io.CopyBuffer(w, body, *buf)
                if err != nil {
                        return err
                }