From: Matt Joiner Date: Tue, 13 May 2025 11:42:52 +0000 (+1000) Subject: Update some file/piece helpers X-Git-Tag: v1.59.0~142 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=c257298edf07a0a53c63e0cd63dc991e0934de12;p=btrtrc.git Update some file/piece helpers --- diff --git a/peer.go b/peer.go index 43f9c7f7..649634b2 100644 --- a/peer.go +++ b/peer.go @@ -184,8 +184,8 @@ func (cn *Peer) expectingChunks() bool { cn.peerAllowedFast.Iterate(func(i pieceIndex) bool { haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex]( cn.requestState.Requests, - cn.t.pieceRequestIndexOffset(i), - cn.t.pieceRequestIndexOffset(i+1), + cn.t.pieceRequestIndexBegin(i), + cn.t.pieceRequestIndexBegin(i+1), ) == 0 return !haveAllowedFastRequests }) diff --git a/peerconn_test.go b/peerconn_test.go index 44309293..39694853 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -116,8 +116,8 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { }) qt.Assert(b, qt.IsTrue(cn.bannableAddr.Ok)) cn.setTorrent(t) - requestIndexBegin := t.pieceRequestIndexOffset(0) - requestIndexEnd := t.pieceRequestIndexOffset(1) + requestIndexBegin := t.pieceRequestIndexBegin(0) + requestIndexEnd := t.pieceRequestIndexBegin(1) eachRequestIndex := func(f func(ri RequestIndex)) { for ri := requestIndexBegin; ri < requestIndexEnd; ri++ { f(ri) diff --git a/piece.go b/piece.go index 6e53fb25..da0b94c4 100644 --- a/piece.go +++ b/piece.go @@ -4,10 +4,13 @@ import ( "context" "errors" "fmt" + "github.com/anacrolix/missinggo/v2/panicif" + "iter" + "sync" + "github.com/anacrolix/chansync" g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/bitmap" - "sync" "github.com/anacrolix/torrent/merkle" "github.com/anacrolix/torrent/metainfo" @@ -26,7 +29,8 @@ type Piece struct { hashV2 g.Option[[32]byte] t *Torrent index pieceIndex - files []*File + // First and one after the last file indexes. + beginFile, endFile int readerCond chansync.BroadcastCond @@ -95,18 +99,18 @@ func (p *Piece) hasDirtyChunks() bool { func (p *Piece) numDirtyChunks() chunkIndexType { return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex]( &p.t.dirtyChunks, - p.requestIndexOffset(), - p.t.pieceRequestIndexOffset(p.index+1))) + p.requestIndexBegin(), + p.requestIndexMaxEnd())) } func (p *Piece) unpendChunkIndex(i chunkIndexType) { - p.t.dirtyChunks.Add(p.requestIndexOffset() + i) + p.t.dirtyChunks.Add(p.requestIndexBegin() + i) p.t.updatePieceRequestOrderPiece(p.index) p.readerCond.Broadcast() } func (p *Piece) pendChunkIndex(i RequestIndex) { - p.t.dirtyChunks.Remove(p.requestIndexOffset() + i) + p.t.dirtyChunks.Remove(p.requestIndexBegin() + i) p.t.updatePieceRequestOrderPiece(p.index) } @@ -141,7 +145,7 @@ func (p *Piece) waitNoPendingWrites() { } func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool { - return p.t.dirtyChunks.Contains(p.requestIndexOffset() + chunk) + return p.t.dirtyChunks.Contains(p.requestIndexBegin() + chunk) } func (p *Piece) chunkIndexSpec(chunk chunkIndexType) ChunkSpec { @@ -240,7 +244,7 @@ func (p *Piece) SetPriority(prio PiecePriority) { // This is priority based only on piece, file and reader priorities. func (p *Piece) purePriority() (ret PiecePriority) { - for _, f := range p.files { + for f := range p.files() { ret.Raise(f.prio) } if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) { @@ -292,8 +296,15 @@ func (p *Piece) State() PieceState { return p.t.PieceState(p.index) } -func (p *Piece) requestIndexOffset() RequestIndex { - return p.t.pieceRequestIndexOffset(p.index) +// The first possible request index for the piece. +func (p *Piece) requestIndexBegin() RequestIndex { + return p.t.pieceRequestIndexBegin(p.index) +} + +// The maximum end request index for the piece. Some of the requests might not be valid, it's for +// cleaning up arrays and bitmaps in broad strokes. +func (p *Piece) requestIndexMaxEnd() RequestIndex { + return p.t.pieceRequestIndexBegin(p.index + 1) } // TODO: Make this peer-only? @@ -304,10 +315,8 @@ func (p *Piece) availability() int { // For v2 torrents, files are aligned to pieces so there should always only be a single file for a // given piece. func (p *Piece) mustGetOnlyFile() *File { - if len(p.files) != 1 { - panic(len(p.files)) - } - return p.files[0] + panicif.NotEq(p.numFiles(), 1) + return (*p.t.files)[p.beginFile] } // Sets the v2 piece hash, queuing initial piece checks if appropriate. @@ -335,7 +344,7 @@ func (p *Piece) haveHash() bool { } func (p *Piece) hasPieceLayer() bool { - return len(p.files) == 1 && p.files[0].length > p.t.info.PieceLength + return p.numFiles() == 1 && p.mustGetOnlyFile().length > p.t.info.PieceLength } func (p *Piece) obtainHashV2() (hash [32]byte, err error) { @@ -360,3 +369,25 @@ func (p *Piece) obtainHashV2() (hash [32]byte, err error) { h.SumMinLength(hash[:0], int(p.t.info.PieceLength)) return } + +func (p *Piece) files() iter.Seq[*File] { + return func(yield func(*File) bool) { + for i := p.beginFile; i < p.endFile; i++ { + if !yield((*p.t.files)[i]) { + return + } + } + } +} + +func (p *Piece) numFiles() int { + return p.endFile - p.beginFile +} + +func (p *Piece) hasActivePeerConnRequests() (ret bool) { + for ri := p.requestIndexBegin(); ri < p.requestIndexMaxEnd(); ri++ { + _, ok := p.t.requestState[ri] + ret = ret || ok + } + return +} diff --git a/requesting.go b/requesting.go index 00a8a659..cf69f08b 100644 --- a/requesting.go +++ b/requesting.go @@ -74,6 +74,8 @@ func (p *peerId) GobDecode(b []byte) error { } type ( + // A request index is a chunk indexed across the entire torrent. It's a single integer and can + // be converted to a protocol request. RequestIndex = requestStrategy.RequestIndex chunkIndexType = requestStrategy.ChunkIndex ) @@ -194,7 +196,7 @@ func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) { return } requestHeap := desiredPeerRequests{ - peer: &p.Peer, + peer: p, pieceStates: t.requestPieceStates, requestIndexes: t.requestIndexes, } @@ -345,7 +347,7 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) { } existing := t.requestingPeer(req) - if existing != nil && existing != &p.Peer { + if existing != nil && existing != p { // don't steal on cancel - because this is triggered by t.cancelRequest below // which means that the cancelled can immediately try to steal back a request // it has lost which can lead to circular cancel/add processing diff --git a/torrent.go b/torrent.go index aefda213..bc6a5977 100644 --- a/torrent.go +++ b/torrent.go @@ -169,6 +169,7 @@ type Torrent struct { connsWithAllPieces map[*Peer]struct{} + // Last active request for each chunks. TODO: Change to PeerConn specific? requestState map[RequestIndex]requestState // Chunks we've written to since the corresponding piece was last checked. dirtyChunks typedRoaring.Bitmap[RequestIndex] @@ -407,9 +408,11 @@ func (t *Torrent) makePieces() { unsafe.SliceData(t.info.Pieces[i*sha1.Size : (i+1)*sha1.Size]))) } files := *t.files + // TODO: This can be done more efficiently by retaining a file iterator between loops. beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files) endFile := pieceEndFileIndex(piece.torrentEndOffset(), files) - piece.files = files[beginFile:endFile] + piece.beginFile = beginFile + piece.endFile = endFile } } @@ -1102,8 +1105,8 @@ func (t *Torrent) chunksPerRegularPiece() chunkIndexType { func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) { t.dirtyChunks.RemoveRange( - uint64(t.pieceRequestIndexOffset(pieceIndex)), - uint64(t.pieceRequestIndexOffset(pieceIndex+1))) + uint64(t.pieceRequestIndexBegin(pieceIndex)), + uint64(t.pieceRequestIndexBegin(pieceIndex+1))) } func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { @@ -1131,7 +1134,7 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter { return &blockCheckingWriter{ cache: &t.smartBanCache, - requestIndex: t.pieceRequestIndexOffset(piece), + requestIndex: t.pieceRequestIndexBegin(piece), chunkSize: t.chunkSize.Int(), } } @@ -2537,7 +2540,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) { - start := t.pieceRequestIndexOffset(piece) + start := t.pieceRequestIndexBegin(piece) end := start + t.pieceNumChunks(piece) for ri := start; ri < end; ri++ { t.cancelRequest(ri) @@ -2665,7 +2668,7 @@ func (t *Torrent) pieceHasher(index pieceIndex) { t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index) } t.dropBannedPeers() - for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ { + for ri := t.pieceRequestIndexBegin(index); ri < t.pieceRequestIndexBegin(index+1); ri++ { t.smartBanCache.ForgetBlock(ri) } } @@ -3016,10 +3019,11 @@ func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request { } func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex { - return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize) + return t.pieceRequestIndexBegin(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize) } -func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex { +// The first request index for the piece. +func (t *Torrent) pieceRequestIndexBegin(piece pieceIndex) RequestIndex { return RequestIndex(piece) * t.chunksPerRegularPiece() } @@ -3082,7 +3086,7 @@ func (t *Torrent) iterUndirtiedRequestIndexesInPiece( f func(RequestIndex), ) { reuseIter.Initialize(&t.dirtyChunks) - pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece) + pieceRequestIndexOffset := t.pieceRequestIndexBegin(piece) iterBitmapUnsetInRange( reuseIter, pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece), diff --git a/torrent_test.go b/torrent_test.go index 75df58fd..638428c5 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -155,8 +155,8 @@ func TestPieceHashFailed(t *testing.T) { require.NoError(t, tt.setInfoBytesLocked(mi.InfoBytes)) tt.cl.lock() tt.dirtyChunks.AddRange( - uint64(tt.pieceRequestIndexOffset(1)), - uint64(tt.pieceRequestIndexOffset(1)+3)) + uint64(tt.pieceRequestIndexBegin(1)), + uint64(tt.pieceRequestIndexBegin(1)+3)) require.True(t, tt.pieceAllDirty(1)) tt.pieceHashed(1, false, nil) // Dirty chunks should be cleared so we can try again.