From 506ff8d037f4e88d9ca8b8ce04e2cfb81a004da8 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 17 Dec 2021 22:06:21 +1100 Subject: [PATCH] Use relative availabilities to determine piece request order Most overhead comes from peers that connect that have everything, and we just increment every single piece's availability. There may be some unresolved determinism with torrents that share the same ordering. --- client_test.go | 4 +- misc.go | 10 ++++ peerconn.go | 70 ++++++++++++++++--------- peerconn_test.go | 2 + piece.go | 9 +++- request-strategy/order.go | 5 +- request-strategy/piece-request-order.go | 2 +- requesting.go | 6 +-- torrent.go | 61 ++++++++++++++++----- 9 files changed, 124 insertions(+), 45 deletions(-) diff --git a/client_test.go b/client_test.go index cbfe48bb..f4c30851 100644 --- a/client_test.go +++ b/client_test.go @@ -537,8 +537,10 @@ func TestPeerInvalidHave(t *testing.T) { assert.True(t, _new) defer tt.Drop() cn := &PeerConn{Peer: Peer{ - t: tt, + t: tt, + callbacks: &cfg.Callbacks, }} + tt.conns[cn] = struct{}{} cn.peerImpl = cn cl.lock() defer cl.unlock() diff --git a/misc.go b/misc.go index 139af2d4..db924bbe 100644 --- a/misc.go +++ b/misc.go @@ -4,6 +4,7 @@ import ( "errors" "net" + "github.com/RoaringBitmap/roaring" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/torrent/types" "golang.org/x/time/rate" @@ -169,3 +170,12 @@ type ( InfoHash = metainfo.Hash IpPort = missinggo.IpPort ) + +func boolSliceToBitmap(slice []bool) (rb roaring.Bitmap) { + for i, b := range slice { + if b { + rb.AddInt(i) + } + } + return +} diff --git a/peerconn.go b/peerconn.go index 4a299714..40692225 100644 --- a/peerconn.go +++ b/peerconn.go @@ -774,28 +774,49 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { // Ignore known excess pieces. bf = bf[:cn.t.numPieces()] } - pp := cn.newPeerPieces() + bm := boolSliceToBitmap(bf) + if cn.t.haveInfo() && pieceIndex(bm.GetCardinality()) == cn.t.numPieces() { + cn.onPeerHasAllPieces() + return nil + } + if !bm.IsEmpty() { + cn.raisePeerMinPieces(pieceIndex(bm.Maximum()) + 1) + } + shouldUpdateRequests := false + if cn.peerSentHaveAll { + if !cn.t.deleteConnWithAllPieces(&cn.Peer) { + panic(cn) + } + cn.peerSentHaveAll = false + if !cn._peerPieces.IsEmpty() { + panic("if peer has all, we expect no individual peer pieces to be set") + } + } else { + bm.Xor(&cn._peerPieces) + } cn.peerSentHaveAll = false - for i, have := range bf { - if have { - cn.raisePeerMinPieces(pieceIndex(i) + 1) - if !pp.Contains(bitmap.BitIndex(i)) { - cn.t.incPieceAvailability(i) - } + // bm is now 'on' for pieces that are changing + bm.Iterate(func(x uint32) bool { + pi := pieceIndex(x) + if cn._peerPieces.Contains(x) { + // Then we must be losing this piece + cn.t.decPieceAvailability(pi) } else { - if pp.Contains(bitmap.BitIndex(i)) { - cn.t.decPieceAvailability(i) + if !shouldUpdateRequests && cn.t.wantPieceIndex(pieceIndex(x)) { + shouldUpdateRequests = true } + // We must be gaining this piece + cn.t.incPieceAvailability(pieceIndex(x)) } - if have { - cn._peerPieces.Add(uint32(i)) - if cn.t.wantPieceIndex(i) { - cn.updateRequests("bitfield") - } - } else { - cn._peerPieces.Remove(uint32(i)) - } + return true + }) + // Apply the changes. If we had everything previously, this should be empty, so xor is the same + // as or. + cn._peerPieces.Xor(&bm) + if shouldUpdateRequests { + cn.updateRequests("bitfield") } + // We didn't guard this before, I see no reason to do it now. cn.peerPiecesChanged() return nil } @@ -803,13 +824,12 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { func (cn *PeerConn) onPeerHasAllPieces() { t := cn.t if t.haveInfo() { - npp, pc := cn.newPeerPieces(), t.numPieces() - for i := 0; i < pc; i += 1 { - if !npp.Contains(bitmap.BitIndex(i)) { - t.incPieceAvailability(i) - } - } + cn._peerPieces.Iterate(func(x uint32) bool { + t.decPieceAvailability(pieceIndex(x)) + return true + }) } + t.addConnWithAllPieces(&cn.Peer) cn.peerSentHaveAll = true cn._peerPieces.Clear() if !cn.t._pendingPieces.IsEmpty() { @@ -824,7 +844,9 @@ func (cn *PeerConn) onPeerSentHaveAll() error { } func (cn *PeerConn) peerSentHaveNone() error { - cn.t.decPeerPieceAvailability(&cn.Peer) + if cn.peerSentHaveAll { + cn.t.decPeerPieceAvailability(&cn.Peer) + } cn._peerPieces.Clear() cn.peerSentHaveAll = false cn.peerPiecesChanged() diff --git a/peerconn_test.go b/peerconn_test.go index 93e512a0..7ecb6933 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -222,8 +222,10 @@ func TestHaveAllThenBitfield(t *testing.T) { pc.peerImpl = &pc tt.conns[&pc] = struct{}{} c.Assert(pc.onPeerSentHaveAll(), qt.IsNil) + c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}}) pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) c.Check(pc.peerMinPieces, qt.Equals, 6) + c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0) c.Assert(pc.t.setInfo(&metainfo.Info{ PieceLength: 0, Pieces: make([]byte, pieceHash.Size()*7), diff --git a/piece.go b/piece.go index 6caa7628..1c4375f1 100644 --- a/piece.go +++ b/piece.go @@ -30,7 +30,10 @@ type Piece struct { publicPieceState PieceState priority piecePriority - availability int64 + // Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is + // incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and + // the Peer isn't recorded in Torrent.connsWithAllPieces. + relativeAvailability int // This can be locked when the Client lock is taken, but probably not vice versa. pendingWritesMutex sync.Mutex @@ -279,3 +282,7 @@ func (me *undirtiedChunksIter) Iter(f func(chunkIndexType)) { func (p *Piece) requestIndexOffset() RequestIndex { return p.t.pieceRequestIndexOffset(p.index) } + +func (p *Piece) availability() int { + return len(p.t.connsWithAllPieces) + p.relativeAvailability +} diff --git a/request-strategy/order.go b/request-strategy/order.go index 50c9838f..cb69d23d 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -24,9 +24,12 @@ type ( func pieceOrderLess(i, j *pieceRequestOrderItem) multiless.Computation { return multiless.New().Int( int(j.state.Priority), int(i.state.Priority), + // TODO: Should we match on complete here to prevent churn when availability changes? ).Bool( j.state.Partial, i.state.Partial, - ).Int64( + ).Int( + // If this is done with relative availability, do we lose some determinism? If completeness + // is used, would that push this far enough down? i.state.Availability, j.state.Availability, ).Int( i.key.Index, j.key.Index, diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go index c9a89663..dbdac738 100644 --- a/request-strategy/piece-request-order.go +++ b/request-strategy/piece-request-order.go @@ -27,7 +27,7 @@ type PieceRequestOrderKey struct { type PieceRequestOrderState struct { Priority piecePriority Partial bool - Availability int64 + Availability int } type pieceRequestOrderItem struct { diff --git a/requesting.go b/requesting.go index 59f6693c..53d5f324 100644 --- a/requesting.go +++ b/requesting.go @@ -19,7 +19,7 @@ func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRe return request_strategy.PieceRequestOrderState{ Priority: t.piece(i).purePriority(), Partial: t.piecePartiallyDownloaded(i), - Availability: t.piece(i).availability, + Availability: t.piece(i).availability(), } } @@ -125,8 +125,8 @@ func (p *peerRequests) Less(i, j int) bool { -int(rightPiece.purePriority()), ) ml = ml.Int( - int(leftPiece.availability), - int(rightPiece.availability)) + int(leftPiece.relativeAvailability), + int(rightPiece.relativeAvailability)) return ml.Less() } diff --git a/torrent.go b/torrent.go index ce4d7f59..f82032be 100644 --- a/torrent.go +++ b/torrent.go @@ -85,7 +85,6 @@ type Torrent struct { files *[]*File webSeeds map[string]*Peer - // Active peer connections, running message stream loops. TODO: Make this // open (not-closed) connections only. conns map[*PeerConn]struct{} @@ -137,6 +136,7 @@ type Torrent struct { activePieceHashes int initialPieceCheckDisabled bool + connsWithAllPieces map[*Peer]struct{} // Count of each request across active connections. pendingRequests map[RequestIndex]*Peer lastRequested map[RequestIndex]time.Time @@ -149,8 +149,12 @@ type Torrent struct { Complete chansync.Flag } -func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) { +func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) { + // This could be done with roaring.BitSliceIndexing. t.iterPeers(func(peer *Peer) { + if _, ok := t.connsWithAllPieces[peer]; ok { + return + } if peer.peerHasPiece(i) { count++ } @@ -163,10 +167,10 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) { return } p := t.piece(i) - if p.availability <= 0 { - panic(p.availability) + if p.relativeAvailability <= 0 { + panic(p.relativeAvailability) } - p.availability-- + p.relativeAvailability-- t.updatePieceRequestOrder(i) } @@ -174,7 +178,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) { // If we don't the info, this should be reconciled when we do. if t.haveInfo() { p := t.piece(i) - p.availability++ + p.relativeAvailability++ t.updatePieceRequestOrder(i) } } @@ -443,12 +447,12 @@ func (t *Torrent) onSetInfo() { t.initPieceRequestOrder() for i := range t.pieces { p := &t.pieces[i] - // Need to add availability before updating piece completion, as that may result in conns + // Need to add relativeAvailability before updating piece completion, as that may result in conns // being dropped. - if p.availability != 0 { - panic(p.availability) + if p.relativeAvailability != 0 { + panic(p.relativeAvailability) } - p.availability = int64(t.pieceAvailabilityFromPeers(i)) + p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i) t.addRequestOrderPiece(i) t.updatePieceCompletion(pieceIndex(i)) if !t.initialPieceCheckDisabled && !p.storageCompletionOk { @@ -571,7 +575,7 @@ func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMe type pieceAvailabilityRun struct { Count pieceIndex - Availability int64 + Availability int } func (me pieceAvailabilityRun) String() string { @@ -580,10 +584,10 @@ func (me pieceAvailabilityRun) String() string { func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) { rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) { - ret = append(ret, pieceAvailabilityRun{Availability: el.(int64), Count: int(count)}) + ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)}) }) for i := range t.pieces { - rle.Append(t.pieces[i].availability, 1) + rle.Append(t.pieces[i].availability(), 1) } rle.Flush() return @@ -836,6 +840,12 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { if t.storage != nil { t.deletePieceRequestOrder() } + for i := range t.pieces { + p := t.piece(i) + if p.relativeAvailability != 0 { + panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability)) + } + } t.pex.Reset() t.cl.event.Broadcast() t.pieceStateChanges.Close() @@ -1407,14 +1417,20 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) { }) } t.assertPendingRequests() + if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 { + panic(t.connsWithAllPieces) + } return } func (t *Torrent) decPeerPieceAvailability(p *Peer) { + if t.deleteConnWithAllPieces(p) { + return + } if !t.haveInfo() { return } - p.newPeerPieces().Iterate(func(i uint32) bool { + p.peerPieces().Iterate(func(i uint32) bool { p.t.decPieceAvailability(pieceIndex(i)) return true }) @@ -2319,3 +2335,20 @@ func (t *Torrent) cancelRequest(r RequestIndex) *Peer { func (t *Torrent) requestingPeer(r RequestIndex) *Peer { return t.pendingRequests[r] } + +func (t *Torrent) addConnWithAllPieces(p *Peer) { + if t.connsWithAllPieces == nil { + t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns) + } + t.connsWithAllPieces[p] = struct{}{} +} + +func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool { + _, ok := t.connsWithAllPieces[p] + delete(t.connsWithAllPieces, p) + return ok +} + +func (t *Torrent) numActivePeers() int { + return len(t.conns) + len(t.webSeeds) +} -- 2.44.0