From: Matt Joiner Date: Fri, 1 Aug 2025 13:51:28 +0000 (+1000) Subject: Tons of webseed optimizations X-Git-Tag: v1.59.0~2^2~71 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=59b411afcd4b50a6f999128f228d3c31c49db112;p=btrtrc.git Tons of webseed optimizations Avoid extra allocation in firstDirtyChunk Allow multiple requests per file (now called slices or webseed chunks) Reduce many allocations and extra work in webseed heap ordering Add webseed request update pprof labels Use a larger buffer for copying webseed response parts --- diff --git a/client.go b/client.go index e4daa460..4c3c6dc2 100644 --- a/client.go +++ b/client.go @@ -21,6 +21,7 @@ import ( "strconv" "time" + "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -52,7 +53,7 @@ import ( "github.com/anacrolix/torrent/webtorrent" ) -const webseedRequestUpdateTimerInterval = time.Second +const webseedRequestUpdateTimerInterval = 5 * time.Second // Clients contain zero or more Torrents. A Client manages a blocklist, the // TCP/UDP protocol ports, and DHT as desired. @@ -62,9 +63,11 @@ type Client struct { connStats AllConnStats counters TorrentStatCounters - _mu lockWithDeferreds - event sync.Cond - closed chansync.SetOnce + _mu lockWithDeferreds + // Used in constrained situations when the lock is held. + roaringIntIterator roaring.IntIterator + event sync.Cond + closed chansync.SetOnce config *ClientConfig logger log.Logger @@ -111,6 +114,7 @@ type Client struct { upnpMappings []*upnpMapping webseedRequestTimer *time.Timer + webseedUpdateReason updateRequestReason activePieceHashers int } diff --git a/math.go b/math.go index 0aefb4f7..0594fd29 100644 --- a/math.go +++ b/math.go @@ -4,8 +4,13 @@ import ( "golang.org/x/exp/constraints" ) +// a/b rounding up func intCeilDiv[T constraints.Unsigned](a, b T) T { // This still sux for negative numbers due to truncating division. But I don't know that we need // or that ceil division makes sense for negative numbers. return (a + b - 1) / b } + +func nextMultiple[T constraints.Integer](x, multiple T) T { + return x + multiple - x%multiple +} diff --git a/piece.go b/piece.go index c95c2fe4..9030d6a5 100644 --- a/piece.go +++ b/piece.go @@ -108,7 +108,8 @@ func (p *Piece) numDirtyChunks() chunkIndexType { return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex]( &p.t.dirtyChunks, p.requestIndexBegin(), - p.requestIndexMaxEnd())) + p.t.pieceRequestIndexBegin(p.index+1), + )) } func (p *Piece) unpendChunkIndex(i chunkIndexType) { @@ -156,9 +157,8 @@ func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool { return p.t.dirtyChunks.Contains(p.requestIndexBegin() + chunk) } -func (p *Piece) iterCleanChunks() iter.Seq[chunkIndexType] { +func (p *Piece) iterCleanChunks(it *roaring.IntIterator) iter.Seq[chunkIndexType] { return func(yield func(chunkIndexType) bool) { - var it roaring.IntIterator it.Initialize(&p.t.dirtyChunks.Bitmap) begin := uint32(p.requestIndexBegin()) end := uint32(p.requestIndexMaxEnd()) @@ -175,7 +175,7 @@ func (p *Piece) iterCleanChunks() iter.Seq[chunkIndexType] { } func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) { - for some := range p.iterCleanChunks() { + for some := range p.iterCleanChunks(&p.t.cl.roaringIntIterator) { return g.Some(some) } return @@ -342,7 +342,12 @@ func (p *Piece) requestIndexBegin() RequestIndex { // The maximum end request index for the piece. Some of the requests might not be valid, it's for // cleaning up arrays and bitmaps in broad strokes. func (p *Piece) requestIndexMaxEnd() RequestIndex { - return p.t.pieceRequestIndexBegin(p.index + 1) + new := min(p.t.pieceRequestIndexBegin(p.index+1), p.t.maxEndRequest()) + if false { + old := p.t.pieceRequestIndexBegin(p.index + 1) + panicif.NotEq(new, old) + } + return new } func (p *Piece) availability() int { diff --git a/torrent.go b/torrent.go index 0af3a32f..edd86654 100644 --- a/torrent.go +++ b/torrent.go @@ -866,6 +866,8 @@ func (t *Torrent) writeStatus(w io.Writer) { } fmt.Fprintln(w) } + // Note this might be shared with other torrents. + fmt.Fprintf(w, "Piece request order length: %v\n", t.getPieceRequestOrder().Len()) fmt.Fprintf(w, "Piece length: %s\n", func() string { if t.haveInfo() { @@ -1161,7 +1163,7 @@ func (t *Torrent) bitfield() (bf []bool) { } func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType { - return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize) + return chunkIndexType(intCeilDiv(t.pieceLength(piece), t.chunkSize)) } func (t *Torrent) chunksPerRegularPiece() chunkIndexType { @@ -3552,8 +3554,27 @@ func (t *Torrent) getFile(fileIndex int) *File { func (t *Torrent) fileMightBePartial(fileIndex int) bool { f := t.getFile(fileIndex) - beginPieceIndex := f.BeginPieceIndex() - endPieceIndex := f.EndPieceIndex() + return t.piecesMightBePartial(f.BeginPieceIndex(), f.EndPieceIndex()) +} + +func (t *Torrent) expandPieceRangeToFullFiles(beginPieceIndex, endPieceIndex pieceIndex) (expandedBegin, expandedEnd pieceIndex) { + // Expand the piece range to include all pieces of the files in the original range. + firstFile := t.getFile(t.piece(beginPieceIndex).beginFile) + lastFile := t.getFile(t.piece(endPieceIndex-1).endFile - 1) + expandedBegin = firstFile.BeginPieceIndex() + expandedEnd = lastFile.EndPieceIndex() + return +} + +// Pieces in the range [begin, end) may have partially complete files. Note we only check for dirty chunks and either all or no pieces being complete. +func (t *Torrent) filesInPieceRangeMightBePartial(begin, end pieceIndex) bool { + begin, end = t.expandPieceRangeToFullFiles(begin, end) + return t.piecesMightBePartial(begin, end) +} + +// Pieces in the range [begin, end) are dirty, or in a mixed completion state. +func (t *Torrent) piecesMightBePartial(beginPieceIndex, endPieceIndex int) bool { + // Check for dirtied chunks. if t.dirtyChunks.IntersectsWithInterval( uint64(t.pieceRequestIndexBegin(beginPieceIndex)), uint64(t.pieceRequestIndexBegin(endPieceIndex)), @@ -3562,6 +3583,7 @@ func (t *Torrent) fileMightBePartial(fileIndex int) bool { // been started. return true } + // Check for mixed completion. var r roaring.Bitmap r.AddRange(uint64(beginPieceIndex), uint64(endPieceIndex)) switch t._completedPieces.AndCardinality(&r) { @@ -3619,3 +3641,9 @@ func (t *Torrent) incrementPiecesDirtiedStats(p *Piece, inc func(stats *ConnStat panicif.GreaterThan(len(distinctUpstreamConnStats), 6) maps.Keys(distinctUpstreamConnStats)(inc) } + +// Maximum end request index for the torrent (one past the last). There might be other requests that +// don't make sense if padding files and v2 are in use. +func (t *Torrent) maxEndRequest() RequestIndex { + return RequestIndex(intCeilDiv(uint64(t.length()), t.chunkSize.Uint64())) +} diff --git a/webseed-peer.go b/webseed-peer.go index 6026d9c7..3a407d5f 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -53,8 +53,8 @@ func (me *webseedPeer) isLowOnRequests() bool { } // Webseed requests are issued globally so per-connection reasons or handling make no sense. -func (me *webseedPeer) onNeedUpdateRequests(updateRequestReason) { - me.peer.cl.scheduleImmediateWebseedRequestUpdate() +func (me *webseedPeer) onNeedUpdateRequests(reason updateRequestReason) { + me.peer.cl.scheduleImmediateWebseedRequestUpdate(reason) } func (me *webseedPeer) expectingChunks() bool { @@ -214,10 +214,10 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) { locker.Lock() // Delete this entry after waiting above on an error, to prevent more requests. ws.deleteActiveRequest(webseedRequest) - if err != nil { - ws.peer.onNeedUpdateRequests("webseedPeer request errored") + cl := ws.peer.cl + if err == nil && cl.numWebSeedRequests[ws.hostKey] <= webseedHostRequestConcurrency/2 { + cl.updateWebseedRequestsWithReason("webseedPeer request completed") } - ws.peer.t.cl.updateWebseedRequestsWithReason("webseedPeer request completed") locker.Unlock() } diff --git a/webseed-requesting.go b/webseed-requesting.go index c980e296..b746850b 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -2,14 +2,17 @@ package torrent import ( "cmp" + "context" "fmt" "iter" "log/slog" "maps" "os" + "runtime/pprof" "strconv" "strings" "sync" + "time" "unique" g "github.com/anacrolix/generics" @@ -18,8 +21,6 @@ import ( "github.com/anacrolix/torrent/internal/request-strategy" "github.com/anacrolix/torrent/metainfo" - pp "github.com/anacrolix/torrent/peer_protocol" - "github.com/anacrolix/torrent/webseed" ) @@ -44,9 +45,15 @@ type ( - Initiate missing requests that fit into the available limits. */ func (cl *Client) updateWebseedRequests() { + if webseed.PrintDebug { + started := time.Now() + defer func() { + now := time.Now() + fmt.Printf("%v: updateWebseedRequests took %v\n", time.Now(), now.Sub(started)) + }() + } type aprioriMapValue struct { - // Change to request index? - startOffset int64 + startIndex RequestIndex webseedRequestOrderValue } aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue) @@ -55,8 +62,7 @@ func (cl *Client) updateWebseedRequests() { if ok { // Shared in the lookup above. t := uniqueKey.t - hasPeerConnRequest := func(offset int64) bool { - reqIndex := t.getRequestIndexContainingOffset(offset) + hasPeerConnRequest := func(reqIndex RequestIndex) bool { return t.requestingPeer(reqIndex) != nil } // Skip the webseed request unless it has a higher priority, is less requested by peer @@ -64,13 +70,13 @@ func (cl *Client) updateWebseedRequests() { // webseed requests in favour of peer conns unless there's nothing else to do. if cmp.Or( cmp.Compare(value.priority, cur.priority), - compareBool(hasPeerConnRequest(cur.startOffset), hasPeerConnRequest(uniqueKey.startOffset)), - cmp.Compare(cur.startOffset, uniqueKey.startOffset), + compareBool(hasPeerConnRequest(cur.startIndex), hasPeerConnRequest(uniqueKey.startRequest)), + cmp.Compare(cur.startIndex, uniqueKey.startRequest), ) <= 0 { continue } } - aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value} + aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startRequest, value} } existingRequests := maps.Collect(cl.iterCurrentWebseedRequests()) // We don't need the value but maybe cloning is just faster anyway? @@ -78,11 +84,12 @@ func (cl *Client) updateWebseedRequests() { type heapElem struct { webseedUniqueRequestKey webseedRequestOrderValue + mightHavePartialFiles bool } // Build the request heap, merging existing requests if they match. heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests)) for key, value := range aprioriMap { - fullKey := webseedUniqueRequestKey{key, value.startOffset} + fullKey := webseedUniqueRequestKey{key, value.startIndex} heapValue := value.webseedRequestOrderValue // If there's a matching existing request, make sure to include a reference to it in the // heap value and deduplicate it. @@ -101,6 +108,7 @@ func (cl *Client) updateWebseedRequests() { heapSlice = append(heapSlice, heapElem{ fullKey, heapValue, + fullKey.mightHavePartialFiles(), }) } // Add remaining existing requests. @@ -109,11 +117,12 @@ func (cl *Client) updateWebseedRequests() { if key.t.dataDownloadDisallowed.IsSet() { continue } - heapSlice = append(heapSlice, heapElem{key, existingRequests[key]}) + heapSlice = append(heapSlice, heapElem{key, existingRequests[key], key.mightHavePartialFiles()}) } aprioriHeap := heap.InterfaceForSlice( &heapSlice, func(l heapElem, r heapElem) bool { + // Not stable ordering but being sticky to existing webseeds should be enough. return cmp.Or( // Prefer highest priority -cmp.Compare(l.priority, r.priority), @@ -121,13 +130,11 @@ func (cl *Client) updateWebseedRequests() { compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil), // Prefer not competing with active peer connections. compareBool(len(l.t.conns) > 0, len(r.t.conns) > 0), - // Try to complete partial files first. - -compareBool(l.t.fileMightBePartial(l.fileIndex), r.t.fileMightBePartial(r.fileIndex)), - // Note this isn't correct if the starting piece is split across multiple files. But - // I plan to refactor to key on starting piece to handle this case. - -cmp.Compare( - l.t.Files()[l.fileIndex].length, - r.t.Files()[r.fileIndex].length), + // Try to complete partial slices first. + -compareBool(l.mightHavePartialFiles, r.mightHavePartialFiles), + // No need to prefer longer files anymore now that we're using slices? + //// Longer files first. + //-cmp.Compare(l.longestFile().Unwrap(), r.longestFile().Unwrap()), ) < 0 }, ) @@ -170,6 +177,8 @@ func (cl *Client) updateWebseedRequests() { } }) + // TODO: Do we deduplicate requests across different webseeds? + for costKey, requestKeys := range plan.byCost { for _, requestKey := range requestKeys { // This could happen if a request is cancelled but hasn't removed itself from the active @@ -184,14 +193,14 @@ func (cl *Client) updateWebseedRequests() { peer := t.webSeeds[requestKey.url] panicif.NotEq(peer.hostKey, costKey) printPlan() - begin := t.getRequestIndexContainingOffset(requestKey.startOffset) - fileEnd := t.endRequestIndexForFileIndex(requestKey.fileIndex) + begin := requestKey.startRequest + chunkEnd := t.endRequestForAlignedWebseedResponse(requestKey.startRequest) last := begin for { if !t.wantReceiveChunk(last) { break } - if last >= fileEnd-1 { + if last >= chunkEnd-1 { break } last++ @@ -201,40 +210,45 @@ func (cl *Client) updateWebseedRequests() { AddSource: true, })).With( "webseedUrl", requestKey.url, - "fileIndex", requestKey.fileIndex) + "webseedChunkIndex", requestKey.sliceIndex) // Request shouldn't exist if this occurs. panicif.LessThan(last, begin) - // Hello C++ my old friend. + // Hello darkness (C++) my old friend... end := last + 1 - truncateEndToCacheBoundary(begin, &end, t.chunkSize) - if webseed.PrintDebug && end != fileEnd { + end = min(end, t.endRequestForAlignedWebseedResponse(begin)) + panicif.LessThanOrEqual(end, begin) + if webseed.PrintDebug && end != chunkEnd { debugLogger.Debug( "shortened webseed request", - "first-file", requestKey.filePath(), - "from", endExclusiveString(begin, fileEnd), + "request key", requestKey, + "from", endExclusiveString(begin, chunkEnd), "to", endExclusiveString(begin, end)) } - panicif.GreaterThan(end, fileEnd) + panicif.GreaterThan(end, chunkEnd) peer.spawnRequest(begin, end, debugLogger) } } } -// Limit a webseed request end request index so that the required response body size fits within -// cache limits for a WebSeed provider. -func truncateEndToCacheBoundary(start RequestIndex, end *RequestIndex, chunkSize pp.Integer) { - // Cloudflare caches up to 512 MB responses by default. - const cacheResponseBodyLimit = 256 << 20 - chunksPerAlignedResponse := RequestIndex(cacheResponseBodyLimit / chunkSize) - startIndex := start / chunksPerAlignedResponse - *end = min(*end, (startIndex+1)*chunksPerAlignedResponse) +// Cloudflare caches up to 512 MB responses by default. This is also an alignment. +var webseedRequestChunkSize uint64 = 256 << 20 + +func (t *Torrent) endRequestForAlignedWebseedResponse(start RequestIndex) RequestIndex { + end := min(t.maxEndRequest(), nextMultiple(start, t.chunksPerAlignedWebseedResponse())) + panicif.LessThanOrEqual(end, start) + return end +} + +func (t *Torrent) chunksPerAlignedWebseedResponse() RequestIndex { + // This is the same as webseedRequestChunkSize, but in terms of RequestIndex. + return RequestIndex(webseedRequestChunkSize / t.chunkSize.Uint64()) } func (cl *Client) dumpCurrentWebseedRequests() { if webseed.PrintDebug { fmt.Println("current webseed requests:") for key, value := range cl.iterCurrentWebseedRequests() { - fmt.Printf("\t%v: %v, priority %v\n", key.filePath(), value.existingWebseedRequest, value.priority) + fmt.Printf("\t%v: %v, priority %v\n", key, value.existingWebseedRequest, value.priority) } } } @@ -256,27 +270,56 @@ func (me webseedRequestPlan) String() string { // Distinct webseed request data when different offsets are not allowed. type aprioriWebseedRequestKey struct { - t *Torrent - fileIndex int - url webseedUrlKey -} - -func (me *aprioriWebseedRequestKey) filePath() string { - return me.t.Files()[me.fileIndex].Path() + t *Torrent + sliceIndex RequestIndex + url webseedUrlKey } func (me *aprioriWebseedRequestKey) String() string { - return fmt.Sprintf("%v from %v", me.filePath(), me.url) + return fmt.Sprintf("slice %v from %v", me.sliceIndex, me.url) } // Distinct webseed request when different offsets to the same object are allowed. type webseedUniqueRequestKey struct { aprioriWebseedRequestKey - startOffset int64 + startRequest RequestIndex +} + +func (me webseedUniqueRequestKey) endPieceIndex() pieceIndex { + return pieceIndex(intCeilDiv( + me.t.endRequestForAlignedWebseedResponse(me.startRequest), + me.t.chunksPerRegularPiece())) +} + +func (me webseedUniqueRequestKey) mightHavePartialFiles() bool { + return me.t.filesInPieceRangeMightBePartial( + me.t.pieceIndexOfRequestIndex(me.startRequest), + me.endPieceIndex()) +} + +func (me webseedUniqueRequestKey) longestFile() (ret g.Option[int64]) { + t := me.t + firstPiece := t.pieceIndexOfRequestIndex(me.startRequest) + firstFileIndex := t.piece(firstPiece).beginFile + endFileIndex := t.piece(me.endPieceIndex() - 1).endFile + for fileIndex := firstFileIndex; fileIndex < endFileIndex; fileIndex++ { + fileLength := t.getFile(fileIndex).length + if ret.Ok { + ret.Value = max(ret.Value, fileLength) + } else { + ret.Set(fileLength) + } + } + return } func (me webseedUniqueRequestKey) String() string { - return me.aprioriWebseedRequestKey.String() + " at " + fmt.Sprintf("0x%x", me.startOffset) + return fmt.Sprintf( + "%v at %v:%v", + me.aprioriWebseedRequestKey, + me.sliceIndex, + me.startRequest%me.t.chunksPerAlignedWebseedResponse(), + ) } // Non-distinct proposed webseed request data. @@ -303,6 +346,9 @@ func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKe value.pieces, func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy.PieceRequestOrderState) bool { t := cl.torrentsByShortHash[ih] + if len(t.webSeeds) == 0 { + return false + } p := t.piece(pieceIndex) cleanOpt := p.firstCleanChunk() if !cleanOpt.Ok { @@ -313,25 +359,25 @@ func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKe // client piece request order and ignores other states like hashing, marking // etc. Order state priority would be faster otherwise. priority := p.effectivePriority() - for i, e := range p.fileExtents(int64(cleanOpt.Value) * int64(t.chunkSize)) { - for url, ws := range t.webSeeds { - // Return value from this function (RequestPieceFunc) doesn't terminate - // iteration, so propagate that to not handling the yield return value. - yield( - webseedUniqueRequestKey{ - aprioriWebseedRequestKey{ - t: t, - fileIndex: i, - url: url, - }, - e.Start, + firstRequest := p.requestIndexBegin() + cleanOpt.Value + webseedSliceIndex := firstRequest / t.chunksPerAlignedWebseedResponse() + for url, ws := range t.webSeeds { + // Return value from this function (RequestPieceFunc) doesn't terminate + // iteration, so propagate that to not handling the yield return value. + yield( + webseedUniqueRequestKey{ + aprioriWebseedRequestKey{ + t: t, + sliceIndex: webseedSliceIndex, + url: url, }, - webseedRequestOrderValue{ - priority: priority, - costKey: ws.hostKey, - }, - ) - } + firstRequest, + }, + webseedRequestOrderValue{ + priority: priority, + costKey: ws.hostKey, + }, + ) } // Pieces iterated here are only to select webseed requests. There's no guarantee they're chosen. return false @@ -344,7 +390,7 @@ func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKe func (cl *Client) updateWebseedRequestsWithReason(reason updateRequestReason) { // Should we wrap this with pprof labels? - cl.scheduleImmediateWebseedRequestUpdate() + cl.scheduleImmediateWebseedRequestUpdate(reason) } func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { @@ -356,20 +402,15 @@ func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey // This request is done, so don't yield it. continue } - off := t.requestIndexBegin(ar.next) - opt := t.fileSegmentsIndex.Unwrap().LocateOffset(off) - if !opt.Ok { - continue - } - p := t.pieceForOffset(off) + p := t.piece(t.pieceIndexOfRequestIndex(ar.next)) if !yield( webseedUniqueRequestKey{ aprioriWebseedRequestKey{ - t: t, - fileIndex: opt.Value.Index, - url: url, + t: t, + sliceIndex: ar.next / t.chunksPerAlignedWebseedResponse(), + url: url, }, - opt.Value.Offset, + ar.next, }, webseedRequestOrderValue{ priority: p.effectivePriority(), @@ -385,7 +426,7 @@ func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey } } -func (cl *Client) scheduleImmediateWebseedRequestUpdate() { +func (cl *Client) scheduleImmediateWebseedRequestUpdate(reason updateRequestReason) { if !cl.webseedRequestTimer.Stop() { // Timer function already running, let it do its thing. return @@ -394,6 +435,7 @@ func (cl *Client) scheduleImmediateWebseedRequestUpdate() { // update on every call to this method). Since we're holding the Client lock, and we cancelled // the timer, and it wasn't active, nobody else should have reset it before us. Do we need to // introduce a "reason" field here, (albeit Client-level?). + cl.webseedUpdateReason = cmp.Or(cl.webseedUpdateReason, reason) panicif.True(cl.webseedRequestTimer.Reset(0)) } @@ -401,14 +443,22 @@ func (cl *Client) updateWebseedRequestsTimerFunc() { if cl.closed.IsSet() { return } + // This won't get set elsewhere if the timer has fired, which it has for us to be here. + cl.webseedUpdateReason = cmp.Or(cl.webseedUpdateReason, "timer") cl.lock() defer cl.unlock() cl.updateWebseedRequestsAndResetTimer() } func (cl *Client) updateWebseedRequestsAndResetTimer() { - cl.updateWebseedRequests() - // Timer should always be stopped before the last call. + pprof.Do(context.Background(), pprof.Labels( + "reason", string(cl.webseedUpdateReason), + ), func(_ context.Context) { + cl.updateWebseedRequests() + cl.webseedUpdateReason = "" + }) + // Timer should always be stopped before the last call. TODO: Don't reset timer if there's + // nothing to do (no possible requests in update). panicif.True(cl.webseedRequestTimer.Reset(webseedRequestUpdateTimerInterval)) } diff --git a/webseed/client.go b/webseed/client.go index cd49934e..46488d3d 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -10,8 +10,10 @@ import ( "net/http" "os" "strings" + "sync" "github.com/RoaringBitmap/roaring" + g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/panicif" "github.com/dustin/go-humanize" "golang.org/x/time/rate" @@ -175,6 +177,10 @@ func (me *Client) checkContentLength(resp *http.Response, part requestPart, expe } } +var bufPool = &sync.Pool{New: func() any { + return g.PtrTo(make([]byte, 128<<10)) // 128 KiB. 4x the default. +}} + // Reads the part in full. All expected bytes must be returned or there will an error returned. func (me *Client) recvPartResult(ctx context.Context, w io.Writer, part requestPart, resp *http.Response) error { defer resp.Body.Close() @@ -191,7 +197,9 @@ func (me *Client) recvPartResult(ctx context.Context, w io.Writer, part requestP case http.StatusPartialContent: // The response should be just as long as we requested. me.checkContentLength(resp, part, part.e.Length) - copied, err := io.Copy(w, body) + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + copied, err := io.CopyBuffer(w, body, *buf) if err != nil { return err }