"errors"
"fmt"
"io"
+ "math/rand"
"net/netip"
"net/url"
"sort"
closed chansync.SetOnce
infoHash metainfo.Hash
pieces []Piece
+
+ // The order pieces are requested if there's no stronger reason like availability or priority.
+ pieceRequestOrder []int
// Values are the piece indices that changed.
pieceStateChanges pubsub.PubSub[PieceStateChange]
// The size of chunks to request from peers over the wire. This is
fileIndex segments.Index
files *[]*File
+ _chunksPerRegularPiece chunkIndexType
+
webSeeds map[string]*Peer
// Active peer connections, running message stream loops. TODO: Make this
// open (not-closed) connections only.
initialPieceCheckDisabled bool
connsWithAllPieces map[*Peer]struct{}
- // Count of each request across active connections.
- pendingRequests map[RequestIndex]*Peer
- lastRequested map[RequestIndex]time.Time
+
+ requestState []requestState
// Chunks we've written to since the corresponding piece was last checked.
dirtyChunks typedRoaring.Bitmap[RequestIndex]
sourcesLogger log.Logger
smartBanCache smartBanCache
+
+ // Large allocations reused between request state updates.
+ requestPieceStates []request_strategy.PieceRequestOrderState
+ requestIndexes []RequestIndex
}
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
piece.files = files[beginFile:endFile]
- piece.undirtiedChunksIter = undirtiedChunksIter{
- TorrentDirtyChunks: &t.dirtyChunks,
- StartRequestIndex: piece.requestIndexOffset(),
- EndRequestIndex: piece.requestIndexOffset() + piece.numChunks(),
- }
}
}
t.nameMu.Lock()
t.info = info
t.nameMu.Unlock()
+ t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
t.updateComplete()
t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
t.displayName = "" // Save a few bytes lol.
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
+ t.pieceRequestOrder = rand.Perm(t.numPieces())
t.initPieceRequestOrder()
+ MakeSliceWithLength(&t.requestState, t.numChunks())
+ MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
for i := range t.pieces {
p := &t.pieces[i]
// Need to add relativeAvailability before updating piece completion, as that may result in conns
}
p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
t.addRequestOrderPiece(i)
- t.updatePieceCompletion(pieceIndex(i))
+ t.updatePieceCompletion(i)
if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
// t.logger.Printf("piece %s completion unknown, queueing check", p)
- t.queuePieceCheck(pieceIndex(i))
+ t.queuePieceCheck(i)
}
}
t.cl.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent()
- t.pendingRequests = make(map[RequestIndex]*Peer)
- t.lastRequested = make(map[RequestIndex]time.Time)
t.tryCreateMorePieceHashers()
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
}
func (t *Torrent) numPieces() pieceIndex {
- return pieceIndex(t.info.NumPieces())
+ return t.info.NumPieces()
}
func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
}
func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
- return chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
+ return t._chunksPerRegularPiece
}
-func (t *Torrent) numRequests() RequestIndex {
+func (t *Torrent) numChunks() RequestIndex {
if t.numPieces() == 0 {
return 0
}
}
// var actual pendingRequests
// if t.haveInfo() {
- // actual.m = make([]int, t.numRequests())
+ // actual.m = make([]int, t.numChunks())
// }
// t.iterPeers(func(p *Peer) {
// p.requestState.Requests.Iterate(func(x uint32) bool {
}
func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
- for ri := t.pieceRequestIndexOffset(piece); ri < t.pieceRequestIndexOffset(piece+1); ri++ {
+ start := t.pieceRequestIndexOffset(piece)
+ end := start + t.pieceNumChunks(piece)
+ for ri := start; ri < end; ri++ {
t.cancelRequest(ri)
}
}
}
func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
- p := t.pendingRequests[r]
+ p := t.requestingPeer(r)
if p != nil {
p.cancel(r)
}
- delete(t.pendingRequests, r)
+ // TODO: This is a check that an old invariant holds. It can be removed after some testing.
+ //delete(t.pendingRequests, r)
+ var zeroRequestState requestState
+ if t.requestState[r] != zeroRequestState {
+ panic("expected request state to be gone")
+ }
return p
}
func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
- return t.pendingRequests[r]
+ return t.requestState[r].peer
}
func (t *Torrent) addConnWithAllPieces(p *Peer) {
func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
return pieceIndex(ri / t.chunksPerRegularPiece())
}
+
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+ reuseIter *typedRoaring.Iterator[RequestIndex],
+ piece pieceIndex,
+ f func(RequestIndex),
+) {
+ reuseIter.Initialize(&t.dirtyChunks)
+ pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+ iterBitmapUnsetInRange(
+ reuseIter,
+ pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+ f,
+ )
+}
+
+type requestState struct {
+ peer *Peer
+ when time.Time
+}