From 529aaa9b1f6cc86884b4c0fc7036c324b949df1d Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 13 Oct 2021 15:16:53 +1100 Subject: [PATCH] Use roaring bitmap for pending pieces Also makes a significant optimization for peerHasWantedPieces possible. --- client.go | 1 - peerconn.go | 10 ++++----- peerconn_test.go | 2 +- torrent.go | 55 ++++++------------------------------------------ 4 files changed, 12 insertions(+), 56 deletions(-) diff --git a/client.go b/client.go index db93e569..83da43e6 100644 --- a/client.go +++ b/client.go @@ -1131,7 +1131,6 @@ func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) { gotMetainfoC: make(chan struct{}), } t.networkingEnabled.Set() - t._pendingPieces.NewSet = priorityBitmapStableNewSet t.logger = cl.logger.WithContextValue(t) if opts.ChunkSize == 0 { opts.ChunkSize = defaultChunkSize diff --git a/peerconn.go b/peerconn.go index aa03a741..a7cf7d2e 100644 --- a/peerconn.go +++ b/peerconn.go @@ -777,8 +777,8 @@ func (cn *Peer) onPeerHasAllPieces() { } cn.peerSentHaveAll = true cn._peerPieces.Clear() - if cn.t._pendingPieces.Len() != 0 { - cn.updateRequests("have all") + if !cn.t._pendingPieces.IsEmpty() { + cn.updateRequests("Peer.onPeerHasAllPieces") } cn.peerPiecesChanged() } @@ -1491,11 +1491,9 @@ func (c *Peer) peerHasWantedPieces() bool { return !c.t.haveAllPieces() } if !c.t.haveInfo() { - return c._peerPieces.GetCardinality() != 0 + return !c._peerPieces.IsEmpty() } - return c._peerPieces.Intersects( - roaring.FlipInt(&c.t._completedPieces, 0, c.t.numPieces()), - ) + return c._peerPieces.Intersects(&c.t._pendingPieces) } func (c *Peer) deleteRequest(r RequestIndex) bool { diff --git a/peerconn_test.go b/peerconn_test.go index be88bf47..b6ad410c 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -104,7 +104,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { PieceLength: 1 << 20, })) t.setChunkSize(defaultChunkSize) - t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority()) + t._pendingPieces.Add(0) r, w := net.Pipe() cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r)) cn.setTorrent(t) diff --git a/torrent.go b/torrent.go index d5af3dad..b4898364 100644 --- a/torrent.go +++ b/torrent.go @@ -27,7 +27,6 @@ import ( "github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" - "github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" "github.com/davecgh/go-spew/spew" @@ -131,7 +130,7 @@ type Torrent struct { // A cache of pieces we need to get. Calculated from various piece and // file priorities and completion states elsewhere. - _pendingPieces prioritybitmap.PriorityBitmap + _pendingPieces roaring.Bitmap // A cache of completed piece indices. _completedPieces roaring.Bitmap // Pieces that need to be hashed. @@ -195,10 +194,6 @@ func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool { return !t.wantPieceIndex(i) } -func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap { - return &t._pendingPieces -} - // Returns a channel that is closed when the Torrent is closed. func (t *Torrent) Closed() events.Done { return t.closed.Done() @@ -970,32 +965,7 @@ func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType } func (t *Torrent) wantPieceIndex(index pieceIndex) bool { - // TODO: Are these overly conservative, should we be guarding this here? - { - if !t.haveInfo() { - return false - } - if index < 0 || index >= t.numPieces() { - return false - } - } - p := &t.pieces[index] - if p.queuedForHash() { - return false - } - if p.hashing { - return false - } - if t.pieceComplete(index) { - return false - } - if t._pendingPieces.Contains(int(index)) { - return true - } - // t.logger.Printf("piece %d not pending", index) - return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool { - return index < begin || index >= end - }) + return t._pendingPieces.Contains(uint32(index)) } // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent @@ -1105,7 +1075,7 @@ func (t *Torrent) maybeNewConns() { } func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { - if t._pendingPieces.Contains(piece) { + if t._pendingPieces.Contains(uint32(piece)) { t.iterPeers(func(c *Peer) { if c.actualRequestState.Interested { return @@ -1128,11 +1098,11 @@ func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) { newPrio := p.uncachedPriority() // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) if newPrio == PiecePriorityNone { - if !t._pendingPieces.Remove(int(piece)) { + if !t._pendingPieces.CheckedRemove(uint32(piece)) { return } } else { - if !t._pendingPieces.Set(int(piece), newPrio.BitmapPriority()) { + if !t._pendingPieces.CheckedAdd(uint32(piece)) { return } } @@ -1188,18 +1158,7 @@ func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool } func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { - prio, ok := t._pendingPieces.GetPriority(piece) - if !ok { - return PiecePriorityNone - } - if prio > 0 { - panic(prio) - } - ret := piecePriority(-prio) - if ret == PiecePriorityNone { - panic(piece) - } - return ret + return t.piece(piece).uncachedPriority() } func (t *Torrent) pendRequest(req RequestIndex) { @@ -1354,7 +1313,7 @@ func (t *Torrent) needData() bool { if !t.haveInfo() { return true } - return t._pendingPieces.Len() != 0 + return !t._pendingPieces.IsEmpty() } func appendMissingStrings(old, new []string) (ret []string) { -- 2.48.1