]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Misc webseed tweaks
authorMatt Joiner <anacrolix@gmail.com>
Fri, 11 Jul 2025 05:02:33 +0000 (15:02 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 11 Jul 2025 05:02:33 +0000 (15:02 +1000)
cmp.go
piece.go
webseed-peer.go
webseed-request.go
webseed-requesting.go

diff --git a/cmp.go b/cmp.go
index 31cc2bf3c73b49b394bbb53a0f0c502bb7ad2358..04f91bb29200063149eaeae73ef871bee0335502 100644 (file)
--- a/cmp.go
+++ b/cmp.go
@@ -1,5 +1,6 @@
 package torrent
 
+// Sorts false before true.
 func compareBool(a, b bool) int {
        if a == b {
                return 0
index 4d6985c1dc8282fad039112044161cf40c42fca8..597cd190520307d1c2c4bc9eac2dc80318ad5274 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -155,15 +155,26 @@ func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool {
        return p.t.dirtyChunks.Contains(p.requestIndexBegin() + chunk)
 }
 
-func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) {
-       it := p.t.dirtyChunks.Iterator()
-       begin := uint32(p.requestIndexBegin())
-       end := uint32(p.requestIndexMaxEnd())
-       it.AdvanceIfNeeded(begin)
-       for next := begin; next < end; next++ {
-               if !it.HasNext() || it.Next() != next {
-                       return g.Some(chunkIndexType(next - begin))
+func (p *Piece) iterCleanChunks() iter.Seq[chunkIndexType] {
+       return func(yield func(chunkIndexType) bool) {
+               it := p.t.dirtyChunks.Iterator()
+               begin := uint32(p.requestIndexBegin())
+               end := uint32(p.requestIndexMaxEnd())
+               it.AdvanceIfNeeded(begin)
+               for next := begin; next < end; next++ {
+                       if !it.HasNext() || it.Next() != next {
+                               if !yield(chunkIndexType(next - begin)) {
+                                       return
+                               }
+                       }
                }
+               return
+       }
+}
+
+func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) {
+       for some := range p.iterCleanChunks() {
+               return g.Some(some)
        }
        return
 }
index 3a5eeb967528c56e8de2ad373a27e626b763bef6..eb6320ced435c9ffc27cf2829974134bb1158825 100644 (file)
@@ -125,6 +125,12 @@ func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) {
                next:    begin,
                end:     end,
        }
+       if ws.hasOverlappingRequests(begin, end) {
+               if webseed.PrintDebug {
+                       fmt.Printf("webseedPeer.spawnRequest: overlapping request for %v[%v-%v)\n", ws.peer.t.name(), begin, end)
+               }
+               ws.peer.t.cl.dumpCurrentWebseedRequests()
+       }
        ws.activeRequests[&wsReq] = struct{}{}
        ws.peer.updateExpectingChunks()
        panicif.Zero(ws.hostKey)
@@ -138,6 +144,18 @@ func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) {
        go ws.runRequest(&wsReq)
 }
 
+func (me *webseedPeer) hasOverlappingRequests(begin, end RequestIndex) bool {
+       for req := range me.activeRequests {
+               if req.cancelled.Load() {
+                       continue
+               }
+               if begin < req.end && end >= req.begin {
+                       return true
+               }
+       }
+       return false
+}
+
 func readChunksErrorLevel(err error, req *webseedRequest) slog.Level {
        if req.cancelled.Load() {
                return slog.LevelDebug
@@ -157,7 +175,13 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) {
        locker := ws.locker
        err := ws.readChunks(webseedRequest)
        if webseed.PrintDebug && webseedRequest.next < webseedRequest.end {
-               fmt.Printf("webseed peer stopped reading chunks early: %v\n", err)
+               fmt.Printf("webseed peer request %v in %v stopped reading chunks early: %v\n", webseedRequest, ws.peer.t.name(), err)
+               if err != nil {
+                       fmt.Printf("error type: %T\n", err)
+               }
+               if err == nil {
+                       ws.peer.t.cl.dumpCurrentWebseedRequests()
+               }
        }
        // Ensure the body reader and response are closed.
        webseedRequest.Close()
@@ -315,6 +339,7 @@ func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) {
                        err = fmt.Errorf("processing chunk: %w", err)
                }
                if stop {
+                       // TODO: Keep reading until the buffer is drained.
                        return
                }
        }
index 7e79f12727145b96cc467fa421eef8a79dd70166..43f36883590df26978a359fc01cef1940c5bd6bd 100644 (file)
@@ -33,3 +33,11 @@ func (me *webseedRequest) Cancel() {
                }
        }
 }
+
+func (me *webseedRequest) String() string {
+       s := fmt.Sprintf("%v of [%v-%v)", me.next, me.begin, me.end)
+       if me.cancelled.Load() {
+               s += " (cancelled)"
+       }
+       return s
+}
index 7ea252bacfc4dfcf91de77ba7ca7353f33aa276f..23616163e3f98514edff996540505846961f531e 100644 (file)
@@ -18,7 +18,7 @@ import (
        "github.com/anacrolix/torrent/webseed"
 )
 
-const defaultRequestsPerWebseedHost = 5
+const defaultRequestsPerWebseedHost = 10
 
 type (
        webseedHostKey       string
@@ -34,20 +34,32 @@ type (
 */
 func (cl *Client) globalUpdateWebSeedRequests() {
        type aprioriMapValue struct {
+               // Change to request index?
                startOffset int64
                webseedRequestOrderValue
        }
        aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue)
-       for uniqueKey, value := range cl.iterWebseed() {
+       for uniqueKey, value := range cl.iterPossibleWebseedRequests() {
                cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey]
-               // Set the webseed request if it doesn't exist, or if the new one has a higher priority or
-               // starts earlier in the file.
-               if !ok || cmp.Or(
-                       cmp.Compare(value.priority, cur.priority),
-                       cmp.Compare(cur.startOffset, uniqueKey.startOffset),
-               ) > 0 {
-                       aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value}
+               if ok {
+                       // Shared in the lookup above.
+                       t := uniqueKey.t
+                       hasPeerConnRequest := func(offset int64) bool {
+                               reqIndex := t.getRequestIndexContainingOffset(offset)
+                               return t.requestingPeer(reqIndex) != nil
+                       }
+                       // Skip the webseed request unless it has a higher priority, is less requested by peer
+                       // conns, or has a lower start offset. Including peer conn requests here will bump
+                       // webseed requests in favour of peer conns unless there's nothing else to do.
+                       if cmp.Or(
+                               cmp.Compare(value.priority, cur.priority),
+                               compareBool(hasPeerConnRequest(cur.startOffset), hasPeerConnRequest(uniqueKey.startOffset)),
+                               cmp.Compare(cur.startOffset, uniqueKey.startOffset),
+                       ) <= 0 {
+                               continue
+                       }
                }
+               aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value}
        }
        existingRequests := maps.Collect(cl.iterCurrentWebseedRequests())
        // We don't need the value but maybe cloning is just faster anyway?
@@ -153,6 +165,7 @@ func (cl *Client) globalUpdateWebSeedRequests() {
                        panicif.NotEq(peer.hostKey, costKey)
                        printPlan()
                        begin := t.getRequestIndexContainingOffset(requestKey.startOffset)
+                       // TODO: Find an actual end, so we don't lose lots of data when requests are cancelled.
                        end := t.endRequestIndexForFileIndex(requestKey.fileIndex)
                        panicif.Eq(begin, end)
                        peer.spawnRequest(begin, end)
@@ -160,6 +173,15 @@ func (cl *Client) globalUpdateWebSeedRequests() {
        }
 }
 
+func (cl *Client) dumpCurrentWebseedRequests() {
+       if webseed.PrintDebug {
+               fmt.Println("current webseed requests:")
+               for key, value := range cl.iterCurrentWebseedRequests() {
+                       fmt.Printf("\t%v: %v, priority %v\n", key.filePath(), value.existingWebseedRequest, value.priority)
+               }
+       }
+}
+
 type webseedRequestPlan struct {
        byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey
 }
@@ -182,8 +204,12 @@ type aprioriWebseedRequestKey struct {
        url       webseedUrlKey
 }
 
-func (me aprioriWebseedRequestKey) String() string {
-       return fmt.Sprintf("%v from %v", me.t.Files()[me.fileIndex].Path(), me.url)
+func (me *aprioriWebseedRequestKey) filePath() string {
+       return me.t.Files()[me.fileIndex].Path()
+}
+
+func (me *aprioriWebseedRequestKey) String() string {
+       return fmt.Sprintf("%v from %v", me.filePath(), me.url)
 }
 
 // Distinct webseed request when different offsets to the same object are allowed.
@@ -198,12 +224,12 @@ func (me webseedUniqueRequestKey) String() string {
 
 // Non-distinct proposed webseed request data.
 type webseedRequestOrderValue struct {
-       priority PiecePriority
+       // The associated webseed request per host limit.
+       costKey webseedHostKeyHandle
        // Used for cancellation if this is deprioritized. Also, a faster way to sort for existing
        // requests.
        existingWebseedRequest *webseedRequest
-       // The associated webseed request per host limit.
-       costKey webseedHostKeyHandle
+       priority               PiecePriority
 }
 
 func (me webseedRequestOrderValue) String() string {
@@ -211,7 +237,7 @@ func (me webseedRequestOrderValue) String() string {
 }
 
 // Yields possible webseed requests by piece. Caller should filter and prioritize these.
-func (cl *Client) iterWebseed() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
+func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
        return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
                for key, value := range cl.pieceRequestOrder {
                        input := key.getRequestStrategyInput(cl)