import (
"cmp"
+ "context"
"fmt"
"iter"
"log/slog"
"maps"
"os"
+ "runtime/pprof"
"strconv"
"strings"
"sync"
+ "time"
"unique"
g "github.com/anacrolix/generics"
"github.com/anacrolix/torrent/internal/request-strategy"
"github.com/anacrolix/torrent/metainfo"
- pp "github.com/anacrolix/torrent/peer_protocol"
-
"github.com/anacrolix/torrent/webseed"
)
- Initiate missing requests that fit into the available limits.
*/
func (cl *Client) updateWebseedRequests() {
+ if webseed.PrintDebug {
+ started := time.Now()
+ defer func() {
+ now := time.Now()
+ fmt.Printf("%v: updateWebseedRequests took %v\n", time.Now(), now.Sub(started))
+ }()
+ }
type aprioriMapValue struct {
- // Change to request index?
- startOffset int64
+ startIndex RequestIndex
webseedRequestOrderValue
}
aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue)
if ok {
// Shared in the lookup above.
t := uniqueKey.t
- hasPeerConnRequest := func(offset int64) bool {
- reqIndex := t.getRequestIndexContainingOffset(offset)
+ hasPeerConnRequest := func(reqIndex RequestIndex) bool {
return t.requestingPeer(reqIndex) != nil
}
// Skip the webseed request unless it has a higher priority, is less requested by peer
// webseed requests in favour of peer conns unless there's nothing else to do.
if cmp.Or(
cmp.Compare(value.priority, cur.priority),
- compareBool(hasPeerConnRequest(cur.startOffset), hasPeerConnRequest(uniqueKey.startOffset)),
- cmp.Compare(cur.startOffset, uniqueKey.startOffset),
+ compareBool(hasPeerConnRequest(cur.startIndex), hasPeerConnRequest(uniqueKey.startRequest)),
+ cmp.Compare(cur.startIndex, uniqueKey.startRequest),
) <= 0 {
continue
}
}
- aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value}
+ aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startRequest, value}
}
existingRequests := maps.Collect(cl.iterCurrentWebseedRequests())
// We don't need the value but maybe cloning is just faster anyway?
type heapElem struct {
webseedUniqueRequestKey
webseedRequestOrderValue
+ mightHavePartialFiles bool
}
// Build the request heap, merging existing requests if they match.
heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests))
for key, value := range aprioriMap {
- fullKey := webseedUniqueRequestKey{key, value.startOffset}
+ fullKey := webseedUniqueRequestKey{key, value.startIndex}
heapValue := value.webseedRequestOrderValue
// If there's a matching existing request, make sure to include a reference to it in the
// heap value and deduplicate it.
heapSlice = append(heapSlice, heapElem{
fullKey,
heapValue,
+ fullKey.mightHavePartialFiles(),
})
}
// Add remaining existing requests.
if key.t.dataDownloadDisallowed.IsSet() {
continue
}
- heapSlice = append(heapSlice, heapElem{key, existingRequests[key]})
+ heapSlice = append(heapSlice, heapElem{key, existingRequests[key], key.mightHavePartialFiles()})
}
aprioriHeap := heap.InterfaceForSlice(
&heapSlice,
func(l heapElem, r heapElem) bool {
+ // Not stable ordering but being sticky to existing webseeds should be enough.
return cmp.Or(
// Prefer highest priority
-cmp.Compare(l.priority, r.priority),
compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil),
// Prefer not competing with active peer connections.
compareBool(len(l.t.conns) > 0, len(r.t.conns) > 0),
- // Try to complete partial files first.
- -compareBool(l.t.fileMightBePartial(l.fileIndex), r.t.fileMightBePartial(r.fileIndex)),
- // Note this isn't correct if the starting piece is split across multiple files. But
- // I plan to refactor to key on starting piece to handle this case.
- -cmp.Compare(
- l.t.Files()[l.fileIndex].length,
- r.t.Files()[r.fileIndex].length),
+ // Try to complete partial slices first.
+ -compareBool(l.mightHavePartialFiles, r.mightHavePartialFiles),
+ // No need to prefer longer files anymore now that we're using slices?
+ //// Longer files first.
+ //-cmp.Compare(l.longestFile().Unwrap(), r.longestFile().Unwrap()),
) < 0
},
)
}
})
+ // TODO: Do we deduplicate requests across different webseeds?
+
for costKey, requestKeys := range plan.byCost {
for _, requestKey := range requestKeys {
// This could happen if a request is cancelled but hasn't removed itself from the active
peer := t.webSeeds[requestKey.url]
panicif.NotEq(peer.hostKey, costKey)
printPlan()
- begin := t.getRequestIndexContainingOffset(requestKey.startOffset)
- fileEnd := t.endRequestIndexForFileIndex(requestKey.fileIndex)
+ begin := requestKey.startRequest
+ chunkEnd := t.endRequestForAlignedWebseedResponse(requestKey.startRequest)
last := begin
for {
if !t.wantReceiveChunk(last) {
break
}
- if last >= fileEnd-1 {
+ if last >= chunkEnd-1 {
break
}
last++
AddSource: true,
})).With(
"webseedUrl", requestKey.url,
- "fileIndex", requestKey.fileIndex)
+ "webseedChunkIndex", requestKey.sliceIndex)
// Request shouldn't exist if this occurs.
panicif.LessThan(last, begin)
- // Hello C++ my old friend.
+ // Hello darkness (C++) my old friend...
end := last + 1
- truncateEndToCacheBoundary(begin, &end, t.chunkSize)
- if webseed.PrintDebug && end != fileEnd {
+ end = min(end, t.endRequestForAlignedWebseedResponse(begin))
+ panicif.LessThanOrEqual(end, begin)
+ if webseed.PrintDebug && end != chunkEnd {
debugLogger.Debug(
"shortened webseed request",
- "first-file", requestKey.filePath(),
- "from", endExclusiveString(begin, fileEnd),
+ "request key", requestKey,
+ "from", endExclusiveString(begin, chunkEnd),
"to", endExclusiveString(begin, end))
}
- panicif.GreaterThan(end, fileEnd)
+ panicif.GreaterThan(end, chunkEnd)
peer.spawnRequest(begin, end, debugLogger)
}
}
}
-// Limit a webseed request end request index so that the required response body size fits within
-// cache limits for a WebSeed provider.
-func truncateEndToCacheBoundary(start RequestIndex, end *RequestIndex, chunkSize pp.Integer) {
- // Cloudflare caches up to 512 MB responses by default.
- const cacheResponseBodyLimit = 256 << 20
- chunksPerAlignedResponse := RequestIndex(cacheResponseBodyLimit / chunkSize)
- startIndex := start / chunksPerAlignedResponse
- *end = min(*end, (startIndex+1)*chunksPerAlignedResponse)
+// Cloudflare caches up to 512 MB responses by default. This is also an alignment.
+var webseedRequestChunkSize uint64 = 256 << 20
+
+func (t *Torrent) endRequestForAlignedWebseedResponse(start RequestIndex) RequestIndex {
+ end := min(t.maxEndRequest(), nextMultiple(start, t.chunksPerAlignedWebseedResponse()))
+ panicif.LessThanOrEqual(end, start)
+ return end
+}
+
+func (t *Torrent) chunksPerAlignedWebseedResponse() RequestIndex {
+ // This is the same as webseedRequestChunkSize, but in terms of RequestIndex.
+ return RequestIndex(webseedRequestChunkSize / t.chunkSize.Uint64())
}
func (cl *Client) dumpCurrentWebseedRequests() {
if webseed.PrintDebug {
fmt.Println("current webseed requests:")
for key, value := range cl.iterCurrentWebseedRequests() {
- fmt.Printf("\t%v: %v, priority %v\n", key.filePath(), value.existingWebseedRequest, value.priority)
+ fmt.Printf("\t%v: %v, priority %v\n", key, value.existingWebseedRequest, value.priority)
}
}
}
// Distinct webseed request data when different offsets are not allowed.
type aprioriWebseedRequestKey struct {
- t *Torrent
- fileIndex int
- url webseedUrlKey
-}
-
-func (me *aprioriWebseedRequestKey) filePath() string {
- return me.t.Files()[me.fileIndex].Path()
+ t *Torrent
+ sliceIndex RequestIndex
+ url webseedUrlKey
}
func (me *aprioriWebseedRequestKey) String() string {
- return fmt.Sprintf("%v from %v", me.filePath(), me.url)
+ return fmt.Sprintf("slice %v from %v", me.sliceIndex, me.url)
}
// Distinct webseed request when different offsets to the same object are allowed.
type webseedUniqueRequestKey struct {
aprioriWebseedRequestKey
- startOffset int64
+ startRequest RequestIndex
+}
+
+func (me webseedUniqueRequestKey) endPieceIndex() pieceIndex {
+ return pieceIndex(intCeilDiv(
+ me.t.endRequestForAlignedWebseedResponse(me.startRequest),
+ me.t.chunksPerRegularPiece()))
+}
+
+func (me webseedUniqueRequestKey) mightHavePartialFiles() bool {
+ return me.t.filesInPieceRangeMightBePartial(
+ me.t.pieceIndexOfRequestIndex(me.startRequest),
+ me.endPieceIndex())
+}
+
+func (me webseedUniqueRequestKey) longestFile() (ret g.Option[int64]) {
+ t := me.t
+ firstPiece := t.pieceIndexOfRequestIndex(me.startRequest)
+ firstFileIndex := t.piece(firstPiece).beginFile
+ endFileIndex := t.piece(me.endPieceIndex() - 1).endFile
+ for fileIndex := firstFileIndex; fileIndex < endFileIndex; fileIndex++ {
+ fileLength := t.getFile(fileIndex).length
+ if ret.Ok {
+ ret.Value = max(ret.Value, fileLength)
+ } else {
+ ret.Set(fileLength)
+ }
+ }
+ return
}
func (me webseedUniqueRequestKey) String() string {
- return me.aprioriWebseedRequestKey.String() + " at " + fmt.Sprintf("0x%x", me.startOffset)
+ return fmt.Sprintf(
+ "%v at %v:%v",
+ me.aprioriWebseedRequestKey,
+ me.sliceIndex,
+ me.startRequest%me.t.chunksPerAlignedWebseedResponse(),
+ )
}
// Non-distinct proposed webseed request data.
value.pieces,
func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy.PieceRequestOrderState) bool {
t := cl.torrentsByShortHash[ih]
+ if len(t.webSeeds) == 0 {
+ return false
+ }
p := t.piece(pieceIndex)
cleanOpt := p.firstCleanChunk()
if !cleanOpt.Ok {
// client piece request order and ignores other states like hashing, marking
// etc. Order state priority would be faster otherwise.
priority := p.effectivePriority()
- for i, e := range p.fileExtents(int64(cleanOpt.Value) * int64(t.chunkSize)) {
- for url, ws := range t.webSeeds {
- // Return value from this function (RequestPieceFunc) doesn't terminate
- // iteration, so propagate that to not handling the yield return value.
- yield(
- webseedUniqueRequestKey{
- aprioriWebseedRequestKey{
- t: t,
- fileIndex: i,
- url: url,
- },
- e.Start,
+ firstRequest := p.requestIndexBegin() + cleanOpt.Value
+ webseedSliceIndex := firstRequest / t.chunksPerAlignedWebseedResponse()
+ for url, ws := range t.webSeeds {
+ // Return value from this function (RequestPieceFunc) doesn't terminate
+ // iteration, so propagate that to not handling the yield return value.
+ yield(
+ webseedUniqueRequestKey{
+ aprioriWebseedRequestKey{
+ t: t,
+ sliceIndex: webseedSliceIndex,
+ url: url,
},
- webseedRequestOrderValue{
- priority: priority,
- costKey: ws.hostKey,
- },
- )
- }
+ firstRequest,
+ },
+ webseedRequestOrderValue{
+ priority: priority,
+ costKey: ws.hostKey,
+ },
+ )
}
// Pieces iterated here are only to select webseed requests. There's no guarantee they're chosen.
return false
func (cl *Client) updateWebseedRequestsWithReason(reason updateRequestReason) {
// Should we wrap this with pprof labels?
- cl.scheduleImmediateWebseedRequestUpdate()
+ cl.scheduleImmediateWebseedRequestUpdate(reason)
}
func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
// This request is done, so don't yield it.
continue
}
- off := t.requestIndexBegin(ar.next)
- opt := t.fileSegmentsIndex.Unwrap().LocateOffset(off)
- if !opt.Ok {
- continue
- }
- p := t.pieceForOffset(off)
+ p := t.piece(t.pieceIndexOfRequestIndex(ar.next))
if !yield(
webseedUniqueRequestKey{
aprioriWebseedRequestKey{
- t: t,
- fileIndex: opt.Value.Index,
- url: url,
+ t: t,
+ sliceIndex: ar.next / t.chunksPerAlignedWebseedResponse(),
+ url: url,
},
- opt.Value.Offset,
+ ar.next,
},
webseedRequestOrderValue{
priority: p.effectivePriority(),
}
}
-func (cl *Client) scheduleImmediateWebseedRequestUpdate() {
+func (cl *Client) scheduleImmediateWebseedRequestUpdate(reason updateRequestReason) {
if !cl.webseedRequestTimer.Stop() {
// Timer function already running, let it do its thing.
return
// update on every call to this method). Since we're holding the Client lock, and we cancelled
// the timer, and it wasn't active, nobody else should have reset it before us. Do we need to
// introduce a "reason" field here, (albeit Client-level?).
+ cl.webseedUpdateReason = cmp.Or(cl.webseedUpdateReason, reason)
panicif.True(cl.webseedRequestTimer.Reset(0))
}
if cl.closed.IsSet() {
return
}
+ // This won't get set elsewhere if the timer has fired, which it has for us to be here.
+ cl.webseedUpdateReason = cmp.Or(cl.webseedUpdateReason, "timer")
cl.lock()
defer cl.unlock()
cl.updateWebseedRequestsAndResetTimer()
}
func (cl *Client) updateWebseedRequestsAndResetTimer() {
- cl.updateWebseedRequests()
- // Timer should always be stopped before the last call.
+ pprof.Do(context.Background(), pprof.Labels(
+ "reason", string(cl.webseedUpdateReason),
+ ), func(_ context.Context) {
+ cl.updateWebseedRequests()
+ cl.webseedUpdateReason = ""
+ })
+ // Timer should always be stopped before the last call. TODO: Don't reset timer if there's
+ // nothing to do (no possible requests in update).
panicif.True(cl.webseedRequestTimer.Reset(webseedRequestUpdateTimerInterval))
}