From 2297350498bc11fec6b742f613513e09fcf2aa7a Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 11 Jul 2025 15:02:33 +1000 Subject: [PATCH] Misc webseed tweaks --- cmp.go | 1 + piece.go | 27 ++++++++++++++------- webseed-peer.go | 27 ++++++++++++++++++++- webseed-request.go | 8 +++++++ webseed-requesting.go | 56 +++++++++++++++++++++++++++++++------------ 5 files changed, 95 insertions(+), 24 deletions(-) diff --git a/cmp.go b/cmp.go index 31cc2bf3..04f91bb2 100644 --- a/cmp.go +++ b/cmp.go @@ -1,5 +1,6 @@ package torrent +// Sorts false before true. func compareBool(a, b bool) int { if a == b { return 0 diff --git a/piece.go b/piece.go index 4d6985c1..597cd190 100644 --- a/piece.go +++ b/piece.go @@ -155,15 +155,26 @@ func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool { return p.t.dirtyChunks.Contains(p.requestIndexBegin() + chunk) } -func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) { - it := p.t.dirtyChunks.Iterator() - begin := uint32(p.requestIndexBegin()) - end := uint32(p.requestIndexMaxEnd()) - it.AdvanceIfNeeded(begin) - for next := begin; next < end; next++ { - if !it.HasNext() || it.Next() != next { - return g.Some(chunkIndexType(next - begin)) +func (p *Piece) iterCleanChunks() iter.Seq[chunkIndexType] { + return func(yield func(chunkIndexType) bool) { + it := p.t.dirtyChunks.Iterator() + begin := uint32(p.requestIndexBegin()) + end := uint32(p.requestIndexMaxEnd()) + it.AdvanceIfNeeded(begin) + for next := begin; next < end; next++ { + if !it.HasNext() || it.Next() != next { + if !yield(chunkIndexType(next - begin)) { + return + } + } } + return + } +} + +func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) { + for some := range p.iterCleanChunks() { + return g.Some(some) } return } diff --git a/webseed-peer.go b/webseed-peer.go index 3a5eeb96..eb6320ce 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -125,6 +125,12 @@ func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) { next: begin, end: end, } + if ws.hasOverlappingRequests(begin, end) { + if webseed.PrintDebug { + fmt.Printf("webseedPeer.spawnRequest: overlapping request for %v[%v-%v)\n", ws.peer.t.name(), begin, end) + } + ws.peer.t.cl.dumpCurrentWebseedRequests() + } ws.activeRequests[&wsReq] = struct{}{} ws.peer.updateExpectingChunks() panicif.Zero(ws.hostKey) @@ -138,6 +144,18 @@ func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) { go ws.runRequest(&wsReq) } +func (me *webseedPeer) hasOverlappingRequests(begin, end RequestIndex) bool { + for req := range me.activeRequests { + if req.cancelled.Load() { + continue + } + if begin < req.end && end >= req.begin { + return true + } + } + return false +} + func readChunksErrorLevel(err error, req *webseedRequest) slog.Level { if req.cancelled.Load() { return slog.LevelDebug @@ -157,7 +175,13 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) { locker := ws.locker err := ws.readChunks(webseedRequest) if webseed.PrintDebug && webseedRequest.next < webseedRequest.end { - fmt.Printf("webseed peer stopped reading chunks early: %v\n", err) + fmt.Printf("webseed peer request %v in %v stopped reading chunks early: %v\n", webseedRequest, ws.peer.t.name(), err) + if err != nil { + fmt.Printf("error type: %T\n", err) + } + if err == nil { + ws.peer.t.cl.dumpCurrentWebseedRequests() + } } // Ensure the body reader and response are closed. webseedRequest.Close() @@ -315,6 +339,7 @@ func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) { err = fmt.Errorf("processing chunk: %w", err) } if stop { + // TODO: Keep reading until the buffer is drained. return } } diff --git a/webseed-request.go b/webseed-request.go index 7e79f127..43f36883 100644 --- a/webseed-request.go +++ b/webseed-request.go @@ -33,3 +33,11 @@ func (me *webseedRequest) Cancel() { } } } + +func (me *webseedRequest) String() string { + s := fmt.Sprintf("%v of [%v-%v)", me.next, me.begin, me.end) + if me.cancelled.Load() { + s += " (cancelled)" + } + return s +} diff --git a/webseed-requesting.go b/webseed-requesting.go index 7ea252ba..23616163 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -18,7 +18,7 @@ import ( "github.com/anacrolix/torrent/webseed" ) -const defaultRequestsPerWebseedHost = 5 +const defaultRequestsPerWebseedHost = 10 type ( webseedHostKey string @@ -34,20 +34,32 @@ type ( */ func (cl *Client) globalUpdateWebSeedRequests() { type aprioriMapValue struct { + // Change to request index? startOffset int64 webseedRequestOrderValue } aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue) - for uniqueKey, value := range cl.iterWebseed() { + for uniqueKey, value := range cl.iterPossibleWebseedRequests() { cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey] - // Set the webseed request if it doesn't exist, or if the new one has a higher priority or - // starts earlier in the file. - if !ok || cmp.Or( - cmp.Compare(value.priority, cur.priority), - cmp.Compare(cur.startOffset, uniqueKey.startOffset), - ) > 0 { - aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value} + if ok { + // Shared in the lookup above. + t := uniqueKey.t + hasPeerConnRequest := func(offset int64) bool { + reqIndex := t.getRequestIndexContainingOffset(offset) + return t.requestingPeer(reqIndex) != nil + } + // Skip the webseed request unless it has a higher priority, is less requested by peer + // conns, or has a lower start offset. Including peer conn requests here will bump + // webseed requests in favour of peer conns unless there's nothing else to do. + if cmp.Or( + cmp.Compare(value.priority, cur.priority), + compareBool(hasPeerConnRequest(cur.startOffset), hasPeerConnRequest(uniqueKey.startOffset)), + cmp.Compare(cur.startOffset, uniqueKey.startOffset), + ) <= 0 { + continue + } } + aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value} } existingRequests := maps.Collect(cl.iterCurrentWebseedRequests()) // We don't need the value but maybe cloning is just faster anyway? @@ -153,6 +165,7 @@ func (cl *Client) globalUpdateWebSeedRequests() { panicif.NotEq(peer.hostKey, costKey) printPlan() begin := t.getRequestIndexContainingOffset(requestKey.startOffset) + // TODO: Find an actual end, so we don't lose lots of data when requests are cancelled. end := t.endRequestIndexForFileIndex(requestKey.fileIndex) panicif.Eq(begin, end) peer.spawnRequest(begin, end) @@ -160,6 +173,15 @@ func (cl *Client) globalUpdateWebSeedRequests() { } } +func (cl *Client) dumpCurrentWebseedRequests() { + if webseed.PrintDebug { + fmt.Println("current webseed requests:") + for key, value := range cl.iterCurrentWebseedRequests() { + fmt.Printf("\t%v: %v, priority %v\n", key.filePath(), value.existingWebseedRequest, value.priority) + } + } +} + type webseedRequestPlan struct { byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey } @@ -182,8 +204,12 @@ type aprioriWebseedRequestKey struct { url webseedUrlKey } -func (me aprioriWebseedRequestKey) String() string { - return fmt.Sprintf("%v from %v", me.t.Files()[me.fileIndex].Path(), me.url) +func (me *aprioriWebseedRequestKey) filePath() string { + return me.t.Files()[me.fileIndex].Path() +} + +func (me *aprioriWebseedRequestKey) String() string { + return fmt.Sprintf("%v from %v", me.filePath(), me.url) } // Distinct webseed request when different offsets to the same object are allowed. @@ -198,12 +224,12 @@ func (me webseedUniqueRequestKey) String() string { // Non-distinct proposed webseed request data. type webseedRequestOrderValue struct { - priority PiecePriority + // The associated webseed request per host limit. + costKey webseedHostKeyHandle // Used for cancellation if this is deprioritized. Also, a faster way to sort for existing // requests. existingWebseedRequest *webseedRequest - // The associated webseed request per host limit. - costKey webseedHostKeyHandle + priority PiecePriority } func (me webseedRequestOrderValue) String() string { @@ -211,7 +237,7 @@ func (me webseedRequestOrderValue) String() string { } // Yields possible webseed requests by piece. Caller should filter and prioritize these. -func (cl *Client) iterWebseed() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { +func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { for key, value := range cl.pieceRequestOrder { input := key.getRequestStrategyInput(cl) -- 2.51.0