From 63b3d2d211ca13d7f7aaa17dc2595ba23c103970 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 20 Sep 2021 18:52:54 +1000 Subject: [PATCH] Track dirty chunks in a single bitmap on Torrent --- peerconn.go | 10 ++++++---- peerconn_test.go | 2 +- piece.go | 40 ++++++++++++++++++++-------------------- roaring.go | 16 ++++++++++++++++ torrent.go | 18 +++++++++++++----- torrent_test.go | 4 +++- 6 files changed, 59 insertions(+), 31 deletions(-) create mode 100644 roaring.go diff --git a/peerconn.go b/peerconn.go index 8d3f6d81..03749e0e 100644 --- a/peerconn.go +++ b/peerconn.go @@ -181,10 +181,12 @@ func (cn *Peer) expectingChunks() bool { if !cn.actualRequestState.Interested { return false } - if cn.peerAllowedFast.IterTyped(func(_i int) bool { - i := RequestIndex(_i) - return cn.actualRequestState.Requests.Rank((i+1)*cn.t.chunksPerRegularPiece())- - cn.actualRequestState.Requests.Rank(i*cn.t.chunksPerRegularPiece()) == 0 + if cn.peerAllowedFast.IterTyped(func(i int) bool { + return roaringBitmapRangeCardinality( + &cn.actualRequestState.Requests, + cn.t.pieceRequestIndexOffset(i), + cn.t.pieceRequestIndexOffset(i+1), + ) == 0 }) { return true } diff --git a/peerconn_test.go b/peerconn_test.go index 395dbb62..5e8cc1b5 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -128,7 +128,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { cl.lock() // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. - t.pieces[0]._dirtyChunks.Clear() + t.pendAllChunkSpecs(0) cn.validReceiveChunks = map[RequestIndex]int{ t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1, } diff --git a/piece.go b/piece.go index 30ac2297..7424bba4 100644 --- a/piece.go +++ b/piece.go @@ -18,9 +18,6 @@ type Piece struct { t *Torrent index pieceIndex files []*File - // Chunks we've written to since the last check. The chunk offset and - // length can be determined by the request chunkSize in use. - _dirtyChunks bitmap.Bitmap readerCond chansync.BroadcastCond @@ -56,7 +53,7 @@ func (p *Piece) Storage() storage.Piece { } func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool { - return !p._dirtyChunks.Contains(bitmap.BitIndex(chunkIndex)) + return !p.chunkIndexDirty(chunkIndex) } func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool { @@ -64,24 +61,27 @@ func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool { } func (p *Piece) hasDirtyChunks() bool { - return p._dirtyChunks.Len() != 0 + return p.numDirtyChunks() != 0 } -func (p *Piece) numDirtyChunks() pp.Integer { - return pp.Integer(p._dirtyChunks.Len()) +func (p *Piece) numDirtyChunks() chunkIndexType { + return chunkIndexType(roaringBitmapRangeCardinality( + &p.t.dirtyChunks, + p.requestIndexOffset(), + p.t.pieceRequestIndexOffset(p.index+1))) } func (p *Piece) unpendChunkIndex(i chunkIndexType) { - p._dirtyChunks.Add(bitmap.BitIndex(i)) + p.t.dirtyChunks.Add(p.requestIndexOffset() + i) p.readerCond.Broadcast() } func (p *Piece) pendChunkIndex(i RequestIndex) { - p._dirtyChunks.Remove(bitmap.BitIndex(i)) + p.t.dirtyChunks.Remove(p.requestIndexOffset() + i) } -func (p *Piece) numChunks() pp.Integer { - return pp.Integer(p.t.pieceNumChunks(p.index)) +func (p *Piece) numChunks() chunkIndexType { + return p.t.pieceNumChunks(p.index) } func (p *Piece) incrementPendingWrites() { @@ -110,12 +110,12 @@ func (p *Piece) waitNoPendingWrites() { p.pendingWritesMutex.Unlock() } -func (p *Piece) chunkIndexDirty(chunk pp.Integer) bool { - return p._dirtyChunks.Contains(bitmap.BitIndex(chunk)) +func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool { + return p.t.dirtyChunks.Contains(p.requestIndexOffset() + chunk) } -func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec { - return chunkIndexSpec(chunk, p.length(), p.chunkSize()) +func (p *Piece) chunkIndexSpec(chunk chunkIndexType) ChunkSpec { + return chunkIndexSpec(pp.Integer(chunk), p.length(), p.chunkSize()) } func (p *Piece) numDirtyBytes() (ret pp.Integer) { @@ -141,7 +141,7 @@ func (p *Piece) chunkSize() pp.Integer { return p.t.chunkSize } -func (p *Piece) lastChunkIndex() pp.Integer { +func (p *Piece) lastChunkIndex() chunkIndexType { return p.numChunks() - 1 } @@ -230,7 +230,7 @@ func (p *Piece) completion() (ret storage.Completion) { } func (p *Piece) allChunksDirty() bool { - return p._dirtyChunks.Len() == bitmap.BitRange(p.numChunks()) + return p.numDirtyChunks() == p.numChunks() } func (p *Piece) State() PieceState { @@ -238,8 +238,8 @@ func (p *Piece) State() PieceState { } func (p *Piece) iterUndirtiedChunks(f func(cs chunkIndexType)) { - for i := chunkIndexType(0); i < chunkIndexType(p.numChunks()); i++ { - if p.chunkIndexDirty(pp.Integer(i)) { + for i := chunkIndexType(0); i < p.numChunks(); i++ { + if p.chunkIndexDirty(i) { continue } f(i) @@ -247,5 +247,5 @@ func (p *Piece) iterUndirtiedChunks(f func(cs chunkIndexType)) { } func (p *Piece) requestIndexOffset() RequestIndex { - return RequestIndex(p.index) * p.t.chunksPerRegularPiece() + return p.t.pieceRequestIndexOffset(p.index) } diff --git a/roaring.go b/roaring.go new file mode 100644 index 00000000..6e5409ec --- /dev/null +++ b/roaring.go @@ -0,0 +1,16 @@ +package torrent + +import ( + "github.com/RoaringBitmap/roaring" +) + +// Return the number of bits set in the range. To do this we need the rank of the item before the +// first, and the rank of the last item. An off-by-one minefield. Hopefully I haven't missed +// something in roaring's API that provides this. +func roaringBitmapRangeCardinality(bm *roaring.Bitmap, start, end uint32) (card uint64) { + card = bm.Rank(end - 1) + if start != 0 { + card -= bm.Rank(start - 1) + } + return +} diff --git a/torrent.go b/torrent.go index 370865e6..4e272048 100644 --- a/torrent.go +++ b/torrent.go @@ -144,6 +144,8 @@ type Torrent struct { // Count of each request across active connections. pendingRequests map[RequestIndex]int + // Chunks we've written to since the corresponding piece was last checked. + dirtyChunks roaring.Bitmap pex pexState } @@ -851,7 +853,9 @@ func (t *Torrent) chunksPerRegularPiece() uint32 { } func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) { - t.pieces[pieceIndex]._dirtyChunks.Clear() + t.dirtyChunks.RemoveRange( + uint64(t.pieceRequestIndexOffset(pieceIndex)), + uint64(t.pieceRequestIndexOffset(pieceIndex+1))) } func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { @@ -1037,11 +1041,11 @@ func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer { if t.pieceComplete(piece) { return 0 } - return pp.Integer(t.pieceNumChunks(piece)) - t.pieces[piece].numDirtyChunks() + return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()) } func (t *Torrent) pieceAllDirty(piece pieceIndex) bool { - return t.pieces[piece]._dirtyChunks.Len() == bitmap.BitRange(t.pieceNumChunks(piece)) + return t.pieces[piece].allChunksDirty() } func (t *Torrent) readersChanged() { @@ -2267,14 +2271,18 @@ func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request { index := ri / t.chunksPerRegularPiece() return Request{ pp.Integer(index), - t.piece(int(index)).chunkIndexSpec(pp.Integer(ri % t.chunksPerRegularPiece())), + t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()), } } func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex { - return t.chunksPerRegularPiece()*uint32(r.Index) + uint32(r.Begin/t.chunkSize) + return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize) } func (t *Torrent) numChunks() RequestIndex { return RequestIndex((t.Length() + int64(t.chunkSize) - 1) / int64(t.chunkSize)) } + +func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex { + return RequestIndex(piece) * t.chunksPerRegularPiece() +} diff --git a/torrent_test.go b/torrent_test.go index bd762c9c..bd5db3d4 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -148,7 +148,9 @@ func TestPieceHashFailed(t *testing.T) { tt.setChunkSize(2) require.NoError(t, tt.setInfoBytesLocked(mi.InfoBytes)) tt.cl.lock() - tt.pieces[1]._dirtyChunks.AddRange(0, 3) + tt.dirtyChunks.AddRange( + uint64(tt.pieceRequestIndexOffset(1)), + uint64(tt.pieceRequestIndexOffset(1)+3)) require.True(t, tt.pieceAllDirty(1)) tt.pieceHashed(1, false, nil) // Dirty chunks should be cleared so we can try again. -- 2.44.0