]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Use reusable roaring iterators
[btrtrc.git] / torrent.go
index ae3d1c05a6d578a755e84a4a580b80a77ccc69c2..bd7b75738471913dc9ddb2da784f82b71f814800 100644 (file)
@@ -8,6 +8,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "math/rand"
        "net/netip"
        "net/url"
        "sort"
@@ -30,6 +31,7 @@ import (
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
+       typedRoaring "github.com/anacrolix/torrent/typed-roaring"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -61,6 +63,9 @@ type Torrent struct {
        closed   chansync.SetOnce
        infoHash metainfo.Hash
        pieces   []Piece
+
+       // The order pieces are requested if there's no stronger reason like availability or priority.
+       pieceRequestOrder []int
        // Values are the piece indices that changed.
        pieceStateChanges pubsub.PubSub[PieceStateChange]
        // The size of chunks to request from peers over the wire. This is
@@ -86,6 +91,8 @@ type Torrent struct {
        fileIndex segments.Index
        files     *[]*File
 
+       _chunksPerRegularPiece chunkIndexType
+
        webSeeds map[string]*Peer
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
@@ -139,11 +146,10 @@ type Torrent struct {
        initialPieceCheckDisabled bool
 
        connsWithAllPieces map[*Peer]struct{}
-       // Count of each request across active connections.
-       pendingRequests map[RequestIndex]*Peer
-       lastRequested   map[RequestIndex]time.Time
+
+       requestState []requestState
        // Chunks we've written to since the corresponding piece was last checked.
-       dirtyChunks roaring.Bitmap
+       dirtyChunks typedRoaring.Bitmap[RequestIndex]
 
        pex pexState
 
@@ -155,6 +161,10 @@ type Torrent struct {
        sourcesLogger log.Logger
 
        smartBanCache smartBanCache
+
+       // Large allocations reused between request state updates.
+       requestPieceStates []request_strategy.PieceRequestOrderState
+       requestIndexes     []RequestIndex
 }
 
 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
@@ -380,11 +390,6 @@ func (t *Torrent) makePieces() {
                beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
                endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
                piece.files = files[beginFile:endFile]
-               piece.undirtiedChunksIter = undirtiedChunksIter{
-                       TorrentDirtyChunks: &t.dirtyChunks,
-                       StartRequestIndex:  piece.requestIndexOffset(),
-                       EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
-               }
        }
 }
 
@@ -434,6 +439,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Lock()
        t.info = info
        t.nameMu.Unlock()
+       t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
        t.updateComplete()
        t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
        t.displayName = "" // Save a few bytes lol.
@@ -452,7 +458,10 @@ func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrder
 
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+       t.pieceRequestOrder = rand.Perm(t.numPieces())
        t.initPieceRequestOrder()
+       MakeSliceWithLength(&t.requestState, t.numChunks())
+       MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
        for i := range t.pieces {
                p := &t.pieces[i]
                // Need to add relativeAvailability before updating piece completion, as that may result in conns
@@ -462,17 +471,15 @@ func (t *Torrent) onSetInfo() {
                }
                p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
                t.addRequestOrderPiece(i)
-               t.updatePieceCompletion(pieceIndex(i))
+               t.updatePieceCompletion(i)
                if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
                        // t.logger.Printf("piece %s completion unknown, queueing check", p)
-                       t.queuePieceCheck(pieceIndex(i))
+                       t.queuePieceCheck(i)
                }
        }
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
-       t.pendingRequests = make(map[RequestIndex]*Peer)
-       t.lastRequested = make(map[RequestIndex]time.Time)
        t.tryCreateMorePieceHashers()
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
@@ -842,7 +849,7 @@ func (t *Torrent) usualPieceSize() int {
 }
 
 func (t *Torrent) numPieces() pieceIndex {
-       return pieceIndex(t.info.NumPieces())
+       return t.info.NumPieces()
 }
 
 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
@@ -919,15 +926,15 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
        return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
 }
 
-func (t *Torrent) chunksPerRegularPiece() uint32 {
-       return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
+func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
+       return t._chunksPerRegularPiece
 }
 
-func (t *Torrent) numRequests() RequestIndex {
+func (t *Torrent) numChunks() RequestIndex {
        if t.numPieces() == 0 {
                return 0
        }
-       return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
+       return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
 }
 
 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
@@ -1173,7 +1180,7 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
                        if !c.peerHasPiece(piece) {
                                return
                        }
-                       if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(uint32(piece)) {
+                       if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
                                return
                        }
                        c.updateRequests(reason)
@@ -1257,7 +1264,7 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
 }
 
 func (t *Torrent) pendRequest(req RequestIndex) {
-       t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
+       t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
 }
 
 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
@@ -1461,13 +1468,7 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
                }
        }
        torrent.Add("deleted connections", 1)
-       if !c.deleteAllRequests().IsEmpty() {
-               t.iterPeers(func(p *Peer) {
-                       if p.isLowOnRequests() {
-                               p.updateRequests("Torrent.deletePeerConn")
-                       }
-               })
-       }
+       c.deleteAllRequests("Torrent.deletePeerConn")
        t.assertPendingRequests()
        if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
                panic(t.connsWithAllPieces)
@@ -1494,7 +1495,7 @@ func (t *Torrent) assertPendingRequests() {
        }
        // var actual pendingRequests
        // if t.haveInfo() {
-       //      actual.m = make([]int, t.numRequests())
+       //      actual.m = make([]int, t.numChunks())
        // }
        // t.iterPeers(func(p *Peer) {
        //      p.requestState.Requests.Iterate(func(x uint32) bool {
@@ -2068,7 +2069,9 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 }
 
 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
-       for ri := t.pieceRequestIndexOffset(piece); ri < t.pieceRequestIndexOffset(piece+1); ri++ {
+       start := t.pieceRequestIndexOffset(piece)
+       end := start + t.pieceNumChunks(piece)
+       for ri := start; ri < end; ri++ {
                t.cancelRequest(ri)
        }
 }
@@ -2408,6 +2411,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
                activeRequests: make(map[Request]webseed.Request, maxRequests),
                maxRequests:    maxRequests,
        }
+       ws.peer.initRequestState()
        for _, opt := range opts {
                opt(&ws.client)
        }
@@ -2437,15 +2441,15 @@ func (t *Torrent) peerIsActive(p *Peer) (active bool) {
 }
 
 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
-       index := ri / t.chunksPerRegularPiece()
+       index := t.pieceIndexOfRequestIndex(ri)
        return Request{
                pp.Integer(index),
-               t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
+               t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
        }
 }
 
 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
-       return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize)
+       return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
 }
 
 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
@@ -2457,16 +2461,21 @@ func (t *Torrent) updateComplete() {
 }
 
 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
-       p := t.pendingRequests[r]
+       p := t.requestingPeer(r)
        if p != nil {
                p.cancel(r)
        }
-       delete(t.pendingRequests, r)
+       // TODO: This is a check that an old invariant holds. It can be removed after some testing.
+       //delete(t.pendingRequests, r)
+       var zeroRequestState requestState
+       if t.requestState[r] != zeroRequestState {
+               panic("expected request state to be gone")
+       }
        return p
 }
 
 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
-       return t.pendingRequests[r]
+       return t.requestState[r].peer
 }
 
 func (t *Torrent) addConnWithAllPieces(p *Peer) {
@@ -2494,3 +2503,26 @@ func (t *Torrent) hasStorageCap() bool {
        _, ok := (*f)()
        return ok
 }
+
+func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
+       return pieceIndex(ri / t.chunksPerRegularPiece())
+}
+
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+       reuseIter *typedRoaring.Iterator[RequestIndex],
+       piece pieceIndex,
+       f func(RequestIndex),
+) {
+       reuseIter.Initialize(&t.dirtyChunks)
+       pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+       iterBitmapUnsetInRange(
+               reuseIter,
+               pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+               f,
+       )
+}
+
+type requestState struct {
+       peer *Peer
+       when time.Time
+}