]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Use Option for cached Torrent length
[btrtrc.git] / torrent.go
index 3e91e736ea5a1546b9394500111b038baca1a324..cc9458d1388dc654beda49274ea61500087aef8c 100644 (file)
@@ -8,6 +8,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "math/rand"
        "net/netip"
        "net/url"
        "sort"
@@ -62,6 +63,9 @@ type Torrent struct {
        closed   chansync.SetOnce
        infoHash metainfo.Hash
        pieces   []Piece
+
+       // The order pieces are requested if there's no stronger reason like availability or priority.
+       pieceRequestOrder []int
        // Values are the piece indices that changed.
        pieceStateChanges pubsub.PubSub[PieceStateChange]
        // The size of chunks to request from peers over the wire. This is
@@ -70,7 +74,7 @@ type Torrent struct {
        chunkPool sync.Pool
        // Total length of the torrent in bytes. Stored because it's not O(1) to
        // get this from the info dict.
-       length *int64
+       _length Option[int64]
 
        // The storage to open when the info dict becomes available.
        storageOpener *storage.Client
@@ -87,6 +91,8 @@ type Torrent struct {
        fileIndex segments.Index
        files     *[]*File
 
+       _chunksPerRegularPiece chunkIndexType
+
        webSeeds map[string]*Peer
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
@@ -141,7 +147,7 @@ type Torrent struct {
 
        connsWithAllPieces map[*Peer]struct{}
 
-       requestState map[RequestIndex]requestState
+       requestState []requestState
        // Chunks we've written to since the corresponding piece was last checked.
        dirtyChunks typedRoaring.Bitmap[RequestIndex]
 
@@ -155,6 +161,14 @@ type Torrent struct {
        sourcesLogger log.Logger
 
        smartBanCache smartBanCache
+
+       // Large allocations reused between request state updates.
+       requestPieceStates []request_strategy.PieceRequestOrderState
+       requestIndexes     []RequestIndex
+}
+
+func (t *Torrent) length() int64 {
+       return t._length.Value()
 }
 
 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
@@ -380,11 +394,6 @@ func (t *Torrent) makePieces() {
                beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
                endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
                piece.files = files[beginFile:endFile]
-               piece.undirtiedChunksIter = undirtiedChunksIter{
-                       TorrentDirtyChunks: &t.dirtyChunks,
-                       StartRequestIndex:  piece.requestIndexOffset(),
-                       EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
-               }
        }
 }
 
@@ -415,7 +424,7 @@ func (t *Torrent) cacheLength() {
        for _, f := range t.info.UpvertedFiles() {
                l += f.Length
        }
-       t.length = &l
+       t._length = Some(l)
 }
 
 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
@@ -434,6 +443,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Lock()
        t.info = info
        t.nameMu.Unlock()
+       t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
        t.updateComplete()
        t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
        t.displayName = "" // Save a few bytes lol.
@@ -452,7 +462,10 @@ func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrder
 
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+       t.pieceRequestOrder = rand.Perm(t.numPieces())
        t.initPieceRequestOrder()
+       MakeSliceWithLength(&t.requestState, t.numChunks())
+       MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
        for i := range t.pieces {
                p := &t.pieces[i]
                // Need to add relativeAvailability before updating piece completion, as that may result in conns
@@ -471,7 +484,6 @@ func (t *Torrent) onSetInfo() {
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
-       t.requestState = make(map[RequestIndex]requestState)
        t.tryCreateMorePieceHashers()
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
@@ -841,7 +853,7 @@ func (t *Torrent) usualPieceSize() int {
 }
 
 func (t *Torrent) numPieces() pieceIndex {
-       return pieceIndex(t.info.NumPieces())
+       return t.info.NumPieces()
 }
 
 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
@@ -887,13 +899,13 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
 }
 
 func (t *Torrent) requestOffset(r Request) int64 {
-       return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
+       return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
 }
 
 // Return the request that would include the given offset into the torrent data. Returns !ok if
 // there is no such request.
 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
-       return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
+       return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
 }
 
 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
@@ -919,10 +931,10 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
 }
 
 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
-       return chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
+       return t._chunksPerRegularPiece
 }
 
-func (t *Torrent) numRequests() RequestIndex {
+func (t *Torrent) numChunks() RequestIndex {
        if t.numPieces() == 0 {
                return 0
        }
@@ -941,7 +953,7 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
                return 0
        }
        if piece == t.numPieces()-1 {
-               ret := pp.Integer(*t.length % t.info.PieceLength)
+               ret := pp.Integer(t.length() % t.info.PieceLength)
                if ret != 0 {
                        return ret
                }
@@ -1217,7 +1229,7 @@ func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
 
 // Returns the range of pieces [begin, end) that contains the extent of bytes.
 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
-       if off >= *t.length {
+       if off >= t.length() {
                return
        }
        if off < 0 {
@@ -1434,7 +1446,7 @@ func (t *Torrent) bytesCompleted() int64 {
        if !t.haveInfo() {
                return 0
        }
-       return *t.length - t.bytesLeft()
+       return t.length() - t.bytesLeft()
 }
 
 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
@@ -1487,7 +1499,7 @@ func (t *Torrent) assertPendingRequests() {
        }
        // var actual pendingRequests
        // if t.haveInfo() {
-       //      actual.m = make([]int, t.numRequests())
+       //      actual.m = make([]int, t.numChunks())
        // }
        // t.iterPeers(func(p *Peer) {
        //      p.requestState.Requests.Iterate(func(x uint32) bool {
@@ -2061,7 +2073,9 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 }
 
 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
-       for ri := t.pieceRequestIndexOffset(piece); ri < t.pieceRequestIndexOffset(piece+1); ri++ {
+       start := t.pieceRequestIndexOffset(piece)
+       end := start + t.pieceNumChunks(piece)
+       for ri := start; ri < end; ri++ {
                t.cancelRequest(ri)
        }
 }
@@ -2457,7 +2471,8 @@ func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
        }
        // TODO: This is a check that an old invariant holds. It can be removed after some testing.
        //delete(t.pendingRequests, r)
-       if _, ok := t.requestState[r]; ok {
+       var zeroRequestState requestState
+       if t.requestState[r] != zeroRequestState {
                panic("expected request state to be gone")
        }
        return p
@@ -2497,6 +2512,20 @@ func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
        return pieceIndex(ri / t.chunksPerRegularPiece())
 }
 
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+       reuseIter *typedRoaring.Iterator[RequestIndex],
+       piece pieceIndex,
+       f func(RequestIndex),
+) {
+       reuseIter.Initialize(&t.dirtyChunks)
+       pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+       iterBitmapUnsetInRange(
+               reuseIter,
+               pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+               f,
+       )
+}
+
 type requestState struct {
        peer *Peer
        when time.Time