"errors"
"fmt"
"io"
+ "math/rand"
"net/netip"
"net/url"
"sort"
closed chansync.SetOnce
infoHash metainfo.Hash
pieces []Piece
+
+ // The order pieces are requested if there's no stronger reason like availability or priority.
+ pieceRequestOrder []int
// Values are the piece indices that changed.
pieceStateChanges pubsub.PubSub[PieceStateChange]
// The size of chunks to request from peers over the wire. This is
chunkPool sync.Pool
// Total length of the torrent in bytes. Stored because it's not O(1) to
// get this from the info dict.
- length *int64
+ _length Option[int64]
// The storage to open when the info dict becomes available.
storageOpener *storage.Client
connsWithAllPieces map[*Peer]struct{}
- requestState []requestState
+ requestState map[RequestIndex]requestState
// Chunks we've written to since the corresponding piece was last checked.
dirtyChunks typedRoaring.Bitmap[RequestIndex]
sourcesLogger log.Logger
smartBanCache smartBanCache
+
+ // Large allocations reused between request state updates.
+ requestPieceStates []request_strategy.PieceRequestOrderState
+ requestIndexes []RequestIndex
+}
+
+func (t *Torrent) length() int64 {
+ return t._length.Value
}
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
piece.files = files[beginFile:endFile]
- piece.undirtiedChunksIter = undirtiedChunksIter{
- TorrentDirtyChunks: &t.dirtyChunks,
- StartRequestIndex: piece.requestIndexOffset(),
- EndRequestIndex: piece.requestIndexOffset() + piece.numChunks(),
- }
}
}
for _, f := range t.info.UpvertedFiles() {
l += f.Length
}
- t.length = &l
+ t._length = Some(l)
}
// TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
+ t.pieceRequestOrder = rand.Perm(t.numPieces())
t.initPieceRequestOrder()
- MakeSliceWithLength(&t.requestState, t.numChunks())
+ MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
for i := range t.pieces {
p := &t.pieces[i]
// Need to add relativeAvailability before updating piece completion, as that may result in conns
t.cl.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent()
+ t.requestState = make(map[RequestIndex]requestState)
t.tryCreateMorePieceHashers()
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
}
func (t *Torrent) numPieces() pieceIndex {
- return pieceIndex(t.info.NumPieces())
+ return t.info.NumPieces()
}
func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
}
func (t *Torrent) requestOffset(r Request) int64 {
- return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
+ return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
}
// Return the request that would include the given offset into the torrent data. Returns !ok if
// there is no such request.
func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
- return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
+ return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
}
func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
return 0
}
if piece == t.numPieces()-1 {
- ret := pp.Integer(*t.length % t.info.PieceLength)
+ ret := pp.Integer(t.length() % t.info.PieceLength)
if ret != 0 {
return ret
}
// Returns the range of pieces [begin, end) that contains the extent of bytes.
func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
- if off >= *t.length {
+ if off >= t.length() {
return
}
if off < 0 {
if !t.haveInfo() {
return 0
}
- return *t.length - t.bytesLeft()
+ return t.length() - t.bytesLeft()
}
func (t *Torrent) SetInfoBytes(b []byte) (err error) {
defer t.cl.unlock()
err = t.cl.runHandshookConn(pc, t)
if err != nil {
- t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
+ t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
}
}
t.iterPeers(func(p *Peer) {
remoteIp := p.remoteIp()
if remoteIp == nil {
- if p.bannableAddr.Ok() {
+ if p.bannableAddr.Ok {
t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
}
return
}
// TODO: This is a check that an old invariant holds. It can be removed after some testing.
//delete(t.pendingRequests, r)
- var zeroRequestState requestState
- if t.requestState[r] != zeroRequestState {
+ if _, ok := t.requestState[r]; ok {
panic("expected request state to be gone")
}
return p
return pieceIndex(ri / t.chunksPerRegularPiece())
}
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+ reuseIter *typedRoaring.Iterator[RequestIndex],
+ piece pieceIndex,
+ f func(RequestIndex),
+) {
+ reuseIter.Initialize(&t.dirtyChunks)
+ pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+ iterBitmapUnsetInRange(
+ reuseIter,
+ pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+ f,
+ )
+}
+
type requestState struct {
peer *Peer
when time.Time