]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Update some file/piece helpers
authorMatt Joiner <anacrolix@gmail.com>
Tue, 13 May 2025 11:42:52 +0000 (21:42 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 13 May 2025 11:42:52 +0000 (21:42 +1000)
peer.go
peerconn_test.go
piece.go
requesting.go
torrent.go
torrent_test.go

diff --git a/peer.go b/peer.go
index 43f9c7f79f93cdfe27616c84148bd08537e8603c..649634b255da328a0fbd5e9c53a25a256cd3a50b 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -184,8 +184,8 @@ func (cn *Peer) expectingChunks() bool {
        cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
                haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
                        cn.requestState.Requests,
-                       cn.t.pieceRequestIndexOffset(i),
-                       cn.t.pieceRequestIndexOffset(i+1),
+                       cn.t.pieceRequestIndexBegin(i),
+                       cn.t.pieceRequestIndexBegin(i+1),
                ) == 0
                return !haveAllowedFastRequests
        })
index 44309293f7cd6727da8983d4ad9751c3da3bfbbf..396948538842230508e3be5baaadd989c6ae6b58 100644 (file)
@@ -116,8 +116,8 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        })
        qt.Assert(b, qt.IsTrue(cn.bannableAddr.Ok))
        cn.setTorrent(t)
-       requestIndexBegin := t.pieceRequestIndexOffset(0)
-       requestIndexEnd := t.pieceRequestIndexOffset(1)
+       requestIndexBegin := t.pieceRequestIndexBegin(0)
+       requestIndexEnd := t.pieceRequestIndexBegin(1)
        eachRequestIndex := func(f func(ri RequestIndex)) {
                for ri := requestIndexBegin; ri < requestIndexEnd; ri++ {
                        f(ri)
index 6e53fb252a6de4b5025a07a00b0aed022410326c..da0b94c4d68f1c44be5558780ce2649a0441eb25 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -4,10 +4,13 @@ import (
        "context"
        "errors"
        "fmt"
+       "github.com/anacrolix/missinggo/v2/panicif"
+       "iter"
+       "sync"
+
        "github.com/anacrolix/chansync"
        g "github.com/anacrolix/generics"
        "github.com/anacrolix/missinggo/v2/bitmap"
-       "sync"
 
        "github.com/anacrolix/torrent/merkle"
        "github.com/anacrolix/torrent/metainfo"
@@ -26,7 +29,8 @@ type Piece struct {
        hashV2 g.Option[[32]byte]
        t      *Torrent
        index  pieceIndex
-       files  []*File
+       // First and one after the last file indexes.
+       beginFile, endFile int
 
        readerCond chansync.BroadcastCond
 
@@ -95,18 +99,18 @@ func (p *Piece) hasDirtyChunks() bool {
 func (p *Piece) numDirtyChunks() chunkIndexType {
        return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex](
                &p.t.dirtyChunks,
-               p.requestIndexOffset(),
-               p.t.pieceRequestIndexOffset(p.index+1)))
+               p.requestIndexBegin(),
+               p.requestIndexMaxEnd()))
 }
 
 func (p *Piece) unpendChunkIndex(i chunkIndexType) {
-       p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
+       p.t.dirtyChunks.Add(p.requestIndexBegin() + i)
        p.t.updatePieceRequestOrderPiece(p.index)
        p.readerCond.Broadcast()
 }
 
 func (p *Piece) pendChunkIndex(i RequestIndex) {
-       p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
+       p.t.dirtyChunks.Remove(p.requestIndexBegin() + i)
        p.t.updatePieceRequestOrderPiece(p.index)
 }
 
@@ -141,7 +145,7 @@ func (p *Piece) waitNoPendingWrites() {
 }
 
 func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool {
-       return p.t.dirtyChunks.Contains(p.requestIndexOffset() + chunk)
+       return p.t.dirtyChunks.Contains(p.requestIndexBegin() + chunk)
 }
 
 func (p *Piece) chunkIndexSpec(chunk chunkIndexType) ChunkSpec {
@@ -240,7 +244,7 @@ func (p *Piece) SetPriority(prio PiecePriority) {
 
 // This is priority based only on piece, file and reader priorities.
 func (p *Piece) purePriority() (ret PiecePriority) {
-       for _, f := range p.files {
+       for f := range p.files() {
                ret.Raise(f.prio)
        }
        if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) {
@@ -292,8 +296,15 @@ func (p *Piece) State() PieceState {
        return p.t.PieceState(p.index)
 }
 
-func (p *Piece) requestIndexOffset() RequestIndex {
-       return p.t.pieceRequestIndexOffset(p.index)
+// The first possible request index for the piece.
+func (p *Piece) requestIndexBegin() RequestIndex {
+       return p.t.pieceRequestIndexBegin(p.index)
+}
+
+// The maximum end request index for the piece. Some of the requests might not be valid, it's for
+// cleaning up arrays and bitmaps in broad strokes.
+func (p *Piece) requestIndexMaxEnd() RequestIndex {
+       return p.t.pieceRequestIndexBegin(p.index + 1)
 }
 
 // TODO: Make this peer-only?
@@ -304,10 +315,8 @@ func (p *Piece) availability() int {
 // For v2 torrents, files are aligned to pieces so there should always only be a single file for a
 // given piece.
 func (p *Piece) mustGetOnlyFile() *File {
-       if len(p.files) != 1 {
-               panic(len(p.files))
-       }
-       return p.files[0]
+       panicif.NotEq(p.numFiles(), 1)
+       return (*p.t.files)[p.beginFile]
 }
 
 // Sets the v2 piece hash, queuing initial piece checks if appropriate.
@@ -335,7 +344,7 @@ func (p *Piece) haveHash() bool {
 }
 
 func (p *Piece) hasPieceLayer() bool {
-       return len(p.files) == 1 && p.files[0].length > p.t.info.PieceLength
+       return p.numFiles() == 1 && p.mustGetOnlyFile().length > p.t.info.PieceLength
 }
 
 func (p *Piece) obtainHashV2() (hash [32]byte, err error) {
@@ -360,3 +369,25 @@ func (p *Piece) obtainHashV2() (hash [32]byte, err error) {
        h.SumMinLength(hash[:0], int(p.t.info.PieceLength))
        return
 }
+
+func (p *Piece) files() iter.Seq[*File] {
+       return func(yield func(*File) bool) {
+               for i := p.beginFile; i < p.endFile; i++ {
+                       if !yield((*p.t.files)[i]) {
+                               return
+                       }
+               }
+       }
+}
+
+func (p *Piece) numFiles() int {
+       return p.endFile - p.beginFile
+}
+
+func (p *Piece) hasActivePeerConnRequests() (ret bool) {
+       for ri := p.requestIndexBegin(); ri < p.requestIndexMaxEnd(); ri++ {
+               _, ok := p.t.requestState[ri]
+               ret = ret || ok
+       }
+       return
+}
index 00a8a6597dfccafa8fc6b96a3a6200a9ca5cd1c3..cf69f08b55beceff5543830289df294ddadc10b0 100644 (file)
@@ -74,6 +74,8 @@ func (p *peerId) GobDecode(b []byte) error {
 }
 
 type (
+       // A request index is a chunk indexed across the entire torrent. It's a single integer and can
+       // be converted to a protocol request.
        RequestIndex   = requestStrategy.RequestIndex
        chunkIndexType = requestStrategy.ChunkIndex
 )
@@ -194,7 +196,7 @@ func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) {
                return
        }
        requestHeap := desiredPeerRequests{
-               peer:           &p.Peer,
+               peer:           p,
                pieceStates:    t.requestPieceStates,
                requestIndexes: t.requestIndexes,
        }
@@ -345,7 +347,7 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) {
                }
 
                existing := t.requestingPeer(req)
-               if existing != nil && existing != &p.Peer {
+               if existing != nil && existing != p {
                        // don't steal on cancel - because this is triggered by t.cancelRequest below
                        // which means that the cancelled can immediately try to steal back a request
                        // it has lost which can lead to circular cancel/add processing
index aefda213bcbeee599b8e2484310614ba814b11a2..bc6a597701833ffd59eaf542f7b912fc7ad349e8 100644 (file)
@@ -169,6 +169,7 @@ type Torrent struct {
 
        connsWithAllPieces map[*Peer]struct{}
 
+       // Last active request for each chunks. TODO: Change to PeerConn specific?
        requestState map[RequestIndex]requestState
        // Chunks we've written to since the corresponding piece was last checked.
        dirtyChunks typedRoaring.Bitmap[RequestIndex]
@@ -407,9 +408,11 @@ func (t *Torrent) makePieces() {
                                unsafe.SliceData(t.info.Pieces[i*sha1.Size : (i+1)*sha1.Size])))
                }
                files := *t.files
+               // TODO: This can be done more efficiently by retaining a file iterator between loops.
                beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
                endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
-               piece.files = files[beginFile:endFile]
+               piece.beginFile = beginFile
+               piece.endFile = endFile
        }
 }
 
@@ -1102,8 +1105,8 @@ func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
 
 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
        t.dirtyChunks.RemoveRange(
-               uint64(t.pieceRequestIndexOffset(pieceIndex)),
-               uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
+               uint64(t.pieceRequestIndexBegin(pieceIndex)),
+               uint64(t.pieceRequestIndexBegin(pieceIndex+1)))
 }
 
 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
@@ -1131,7 +1134,7 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
        return &blockCheckingWriter{
                cache:        &t.smartBanCache,
-               requestIndex: t.pieceRequestIndexOffset(piece),
+               requestIndex: t.pieceRequestIndexBegin(piece),
                chunkSize:    t.chunkSize.Int(),
        }
 }
@@ -2537,7 +2540,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
 }
 
 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
-       start := t.pieceRequestIndexOffset(piece)
+       start := t.pieceRequestIndexBegin(piece)
        end := start + t.pieceNumChunks(piece)
        for ri := start; ri < end; ri++ {
                t.cancelRequest(ri)
@@ -2665,7 +2668,7 @@ func (t *Torrent) pieceHasher(index pieceIndex) {
                        t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
                }
                t.dropBannedPeers()
-               for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
+               for ri := t.pieceRequestIndexBegin(index); ri < t.pieceRequestIndexBegin(index+1); ri++ {
                        t.smartBanCache.ForgetBlock(ri)
                }
        }
@@ -3016,10 +3019,11 @@ func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
 }
 
 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
-       return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
+       return t.pieceRequestIndexBegin(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
 }
 
-func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
+// The first request index for the piece.
+func (t *Torrent) pieceRequestIndexBegin(piece pieceIndex) RequestIndex {
        return RequestIndex(piece) * t.chunksPerRegularPiece()
 }
 
@@ -3082,7 +3086,7 @@ func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
        f func(RequestIndex),
 ) {
        reuseIter.Initialize(&t.dirtyChunks)
-       pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+       pieceRequestIndexOffset := t.pieceRequestIndexBegin(piece)
        iterBitmapUnsetInRange(
                reuseIter,
                pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
index 75df58fde6770dff927b30dba080da8902df78a6..638428c5ee6f61fae9327174124ad88523f01ab8 100644 (file)
@@ -155,8 +155,8 @@ func TestPieceHashFailed(t *testing.T) {
        require.NoError(t, tt.setInfoBytesLocked(mi.InfoBytes))
        tt.cl.lock()
        tt.dirtyChunks.AddRange(
-               uint64(tt.pieceRequestIndexOffset(1)),
-               uint64(tt.pieceRequestIndexOffset(1)+3))
+               uint64(tt.pieceRequestIndexBegin(1)),
+               uint64(tt.pieceRequestIndexBegin(1)+3))
        require.True(t, tt.pieceAllDirty(1))
        tt.pieceHashed(1, false, nil)
        // Dirty chunks should be cleared so we can try again.