From 9e8eb8031462455f5ad737dd1ee221b7e9149d4e Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 23 Jul 2025 13:44:10 +1000 Subject: [PATCH] Webseeds favour requesting partial files --- peerconn.go | 4 ++-- requesting.go | 4 ++-- torrent.go | 40 +++++++++++++++++++++++++++++++++++++--- webseed-requesting.go | 10 +++++++--- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/peerconn.go b/peerconn.go index 1191093b..e0c9b363 100644 --- a/peerconn.go +++ b/peerconn.go @@ -14,6 +14,7 @@ import ( "strings" "sync/atomic" "time" + "weak" "github.com/RoaringBitmap/roaring" "github.com/anacrolix/generics" @@ -98,7 +99,6 @@ type PeerConn struct { // we may not even know the number of pieces in the torrent yet. peerSentHaveAll bool - // TODO: How are pending cancels handled for webseed peers? requestState requestStrategy.PeerRequestState peerRequestDataAllocLimiter alloclim.Limiter @@ -1716,7 +1716,7 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) { } cn.validReceiveChunks[r]++ cn.t.requestState[r] = requestState{ - peer: cn, + peer: weak.Make(cn), when: time.Now(), } cn.updateExpectingChunks() diff --git a/requesting.go b/requesting.go index c45a502b..5ec3391d 100644 --- a/requesting.go +++ b/requesting.go @@ -127,8 +127,8 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex } leftRequestState := t.requestState[leftRequest] rightRequestState := t.requestState[rightRequest] - leftPeer := leftRequestState.peer - rightPeer := rightRequestState.peer + leftPeer := leftRequestState.peer.Value() + rightPeer := rightRequestState.peer.Value() // Prefer chunks already requested from this peer. ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) // Prefer unrequested chunks. diff --git a/torrent.go b/torrent.go index 1fa8ca78..a69fa6bc 100644 --- a/torrent.go +++ b/torrent.go @@ -21,6 +21,7 @@ import ( "time" "unique" "unsafe" + "weak" "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" @@ -1863,6 +1864,9 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) { } torrent.Add("deleted connections", 1) c.deleteAllRequests("Torrent.deletePeerConn") + if len(t.conns) == 0 { + panicif.NotZero(len(t.requestState)) + } t.assertPendingRequests() if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 { panic(t.connsWithAllPieces) @@ -3167,8 +3171,10 @@ func (t *Torrent) cancelRequest(r RequestIndex) *PeerConn { return p } -func (t *Torrent) requestingPeer(r RequestIndex) *PeerConn { - return t.requestState[r].peer +func (t *Torrent) requestingPeer(r RequestIndex) (ret *PeerConn) { + ret = t.requestState[r].peer.Value() + panicif.Nil(ret) + return } func (t *Torrent) addConnWithAllPieces(p *Peer) { @@ -3229,7 +3235,7 @@ func (t *Torrent) GetWebRtcPeerConnStats() map[string]webRtcStatsReports { } type requestState struct { - peer *PeerConn + peer weak.Pointer[PeerConn] when time.Time } @@ -3558,3 +3564,31 @@ func (t *Torrent) considerStartingHashers() bool { } return true } + +func (t *Torrent) getFile(fileIndex int) *File { + return (*t.files)[fileIndex] +} + +func (t *Torrent) fileMightBePartial(fileIndex int) bool { + f := t.getFile(fileIndex) + beginPieceIndex := f.BeginPieceIndex() + endPieceIndex := f.EndPieceIndex() + if t.dirtyChunks.IntersectsWithInterval( + uint64(t.pieceRequestIndexBegin(beginPieceIndex)), + uint64(t.pieceRequestIndexBegin(endPieceIndex)), + ) { + // We have dirty chunks. Even if the file is complete, this could mean a partial file has + // been started. + return true + } + var r roaring.Bitmap + r.AddRange(uint64(beginPieceIndex), uint64(endPieceIndex)) + switch t._completedPieces.AndCardinality(&r) { + case 0, uint64(endPieceIndex - beginPieceIndex): + // We have either no pieces or all pieces and no dirty chunks. + return false + default: + // We're somewhere in-between. + return true + } +} diff --git a/webseed-requesting.go b/webseed-requesting.go index 5d50f949..6cd88655 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -99,11 +99,15 @@ func (cl *Client) updateWebseedRequests() { aprioriHeap := heap.InterfaceForSlice( &heapSlice, func(l heapElem, r heapElem) bool { - // Prefer the highest priority, then existing requests, then largest files. return cmp.Or( + // Prefer highest priority -cmp.Compare(l.priority, r.priority), - // Existing requests are assigned the priority of the piece they're reading next. + // Then existing requests compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil), + // Prefer not competing with active peer connections. + compareBool(len(l.t.conns) > 0, len(r.t.conns) > 0), + // Try to complete partial files first. + -compareBool(l.t.fileMightBePartial(l.fileIndex), r.t.fileMightBePartial(r.fileIndex)), // Note this isn't correct if the starting piece is split across multiple files. But // I plan to refactor to key on starting piece to handle this case. -cmp.Compare( @@ -351,7 +355,7 @@ func (cl *Client) scheduleImmediateWebseedRequestUpdate() { } // Set the timer to fire right away (this will coalesce consecutive updates without forcing an // update on every call to this method). Since we're holding the Client lock, and we cancelled - // the timer and it wasn't active, nobody else should have reset it before us. + // the timer, and it wasn't active, nobody else should have reset it before us. panicif.True(cl.webseedRequestTimer.Reset(0)) } -- 2.51.0