"strings"
"sync/atomic"
"time"
+ "weak"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/generics"
// we may not even know the number of pieces in the torrent yet.
peerSentHaveAll bool
- // TODO: How are pending cancels handled for webseed peers?
requestState requestStrategy.PeerRequestState
peerRequestDataAllocLimiter alloclim.Limiter
}
cn.validReceiveChunks[r]++
cn.t.requestState[r] = requestState{
- peer: cn,
+ peer: weak.Make(cn),
when: time.Now(),
}
cn.updateExpectingChunks()
}
leftRequestState := t.requestState[leftRequest]
rightRequestState := t.requestState[rightRequest]
- leftPeer := leftRequestState.peer
- rightPeer := rightRequestState.peer
+ leftPeer := leftRequestState.peer.Value()
+ rightPeer := rightRequestState.peer.Value()
// Prefer chunks already requested from this peer.
ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
// Prefer unrequested chunks.
"time"
"unique"
"unsafe"
+ "weak"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/chansync"
}
torrent.Add("deleted connections", 1)
c.deleteAllRequests("Torrent.deletePeerConn")
+ if len(t.conns) == 0 {
+ panicif.NotZero(len(t.requestState))
+ }
t.assertPendingRequests()
if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
panic(t.connsWithAllPieces)
return p
}
-func (t *Torrent) requestingPeer(r RequestIndex) *PeerConn {
- return t.requestState[r].peer
+func (t *Torrent) requestingPeer(r RequestIndex) (ret *PeerConn) {
+ ret = t.requestState[r].peer.Value()
+ panicif.Nil(ret)
+ return
}
func (t *Torrent) addConnWithAllPieces(p *Peer) {
}
type requestState struct {
- peer *PeerConn
+ peer weak.Pointer[PeerConn]
when time.Time
}
}
return true
}
+
+func (t *Torrent) getFile(fileIndex int) *File {
+ return (*t.files)[fileIndex]
+}
+
+func (t *Torrent) fileMightBePartial(fileIndex int) bool {
+ f := t.getFile(fileIndex)
+ beginPieceIndex := f.BeginPieceIndex()
+ endPieceIndex := f.EndPieceIndex()
+ if t.dirtyChunks.IntersectsWithInterval(
+ uint64(t.pieceRequestIndexBegin(beginPieceIndex)),
+ uint64(t.pieceRequestIndexBegin(endPieceIndex)),
+ ) {
+ // We have dirty chunks. Even if the file is complete, this could mean a partial file has
+ // been started.
+ return true
+ }
+ var r roaring.Bitmap
+ r.AddRange(uint64(beginPieceIndex), uint64(endPieceIndex))
+ switch t._completedPieces.AndCardinality(&r) {
+ case 0, uint64(endPieceIndex - beginPieceIndex):
+ // We have either no pieces or all pieces and no dirty chunks.
+ return false
+ default:
+ // We're somewhere in-between.
+ return true
+ }
+}
aprioriHeap := heap.InterfaceForSlice(
&heapSlice,
func(l heapElem, r heapElem) bool {
- // Prefer the highest priority, then existing requests, then largest files.
return cmp.Or(
+ // Prefer highest priority
-cmp.Compare(l.priority, r.priority),
- // Existing requests are assigned the priority of the piece they're reading next.
+ // Then existing requests
compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil),
+ // Prefer not competing with active peer connections.
+ compareBool(len(l.t.conns) > 0, len(r.t.conns) > 0),
+ // Try to complete partial files first.
+ -compareBool(l.t.fileMightBePartial(l.fileIndex), r.t.fileMightBePartial(r.fileIndex)),
// Note this isn't correct if the starting piece is split across multiple files. But
// I plan to refactor to key on starting piece to handle this case.
-cmp.Compare(
}
// Set the timer to fire right away (this will coalesce consecutive updates without forcing an
// update on every call to this method). Since we're holding the Client lock, and we cancelled
- // the timer and it wasn't active, nobody else should have reset it before us.
+ // the timer, and it wasn't active, nobody else should have reset it before us.
panicif.True(cl.webseedRequestTimer.Reset(0))
}