]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Use relative availabilities to determine piece request order
authorMatt Joiner <anacrolix@gmail.com>
Fri, 17 Dec 2021 11:06:21 +0000 (22:06 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 23 Dec 2021 03:00:00 +0000 (14:00 +1100)
Most overhead comes from peers that connect that have everything, and we just increment every single piece's availability. There may be some unresolved determinism with torrents that share the same ordering.

client_test.go
misc.go
peerconn.go
peerconn_test.go
piece.go
request-strategy/order.go
request-strategy/piece-request-order.go
requesting.go
torrent.go

index cbfe48bbee9323fcc5b54ec8b498e2a9dcb77f94..f4c3085162232e095a0c6acfc6b3804945f2c282 100644 (file)
@@ -537,8 +537,10 @@ func TestPeerInvalidHave(t *testing.T) {
        assert.True(t, _new)
        defer tt.Drop()
        cn := &PeerConn{Peer: Peer{
-               t: tt,
+               t:         tt,
+               callbacks: &cfg.Callbacks,
        }}
+       tt.conns[cn] = struct{}{}
        cn.peerImpl = cn
        cl.lock()
        defer cl.unlock()
diff --git a/misc.go b/misc.go
index 139af2d405844534c2e9018dbc68ba9e0eff5f12..db924bbe99bf944c55594efe95ac15c69e6edaec 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -4,6 +4,7 @@ import (
        "errors"
        "net"
 
+       "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/torrent/types"
        "golang.org/x/time/rate"
@@ -169,3 +170,12 @@ type (
        InfoHash   = metainfo.Hash
        IpPort     = missinggo.IpPort
 )
+
+func boolSliceToBitmap(slice []bool) (rb roaring.Bitmap) {
+       for i, b := range slice {
+               if b {
+                       rb.AddInt(i)
+               }
+       }
+       return
+}
index 4a29971418db1cffaae3df83cef6040a2b087181..406922254a4ec10ab40fbfc55d27dc887315dee4 100644 (file)
@@ -774,28 +774,49 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error {
                // Ignore known excess pieces.
                bf = bf[:cn.t.numPieces()]
        }
-       pp := cn.newPeerPieces()
+       bm := boolSliceToBitmap(bf)
+       if cn.t.haveInfo() && pieceIndex(bm.GetCardinality()) == cn.t.numPieces() {
+               cn.onPeerHasAllPieces()
+               return nil
+       }
+       if !bm.IsEmpty() {
+               cn.raisePeerMinPieces(pieceIndex(bm.Maximum()) + 1)
+       }
+       shouldUpdateRequests := false
+       if cn.peerSentHaveAll {
+               if !cn.t.deleteConnWithAllPieces(&cn.Peer) {
+                       panic(cn)
+               }
+               cn.peerSentHaveAll = false
+               if !cn._peerPieces.IsEmpty() {
+                       panic("if peer has all, we expect no individual peer pieces to be set")
+               }
+       } else {
+               bm.Xor(&cn._peerPieces)
+       }
        cn.peerSentHaveAll = false
-       for i, have := range bf {
-               if have {
-                       cn.raisePeerMinPieces(pieceIndex(i) + 1)
-                       if !pp.Contains(bitmap.BitIndex(i)) {
-                               cn.t.incPieceAvailability(i)
-                       }
+       // bm is now 'on' for pieces that are changing
+       bm.Iterate(func(x uint32) bool {
+               pi := pieceIndex(x)
+               if cn._peerPieces.Contains(x) {
+                       // Then we must be losing this piece
+                       cn.t.decPieceAvailability(pi)
                } else {
-                       if pp.Contains(bitmap.BitIndex(i)) {
-                               cn.t.decPieceAvailability(i)
+                       if !shouldUpdateRequests && cn.t.wantPieceIndex(pieceIndex(x)) {
+                               shouldUpdateRequests = true
                        }
+                       // We must be gaining this piece
+                       cn.t.incPieceAvailability(pieceIndex(x))
                }
-               if have {
-                       cn._peerPieces.Add(uint32(i))
-                       if cn.t.wantPieceIndex(i) {
-                               cn.updateRequests("bitfield")
-                       }
-               } else {
-                       cn._peerPieces.Remove(uint32(i))
-               }
+               return true
+       })
+       // Apply the changes. If we had everything previously, this should be empty, so xor is the same
+       // as or.
+       cn._peerPieces.Xor(&bm)
+       if shouldUpdateRequests {
+               cn.updateRequests("bitfield")
        }
+       // We didn't guard this before, I see no reason to do it now.
        cn.peerPiecesChanged()
        return nil
 }
@@ -803,13 +824,12 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error {
 func (cn *PeerConn) onPeerHasAllPieces() {
        t := cn.t
        if t.haveInfo() {
-               npp, pc := cn.newPeerPieces(), t.numPieces()
-               for i := 0; i < pc; i += 1 {
-                       if !npp.Contains(bitmap.BitIndex(i)) {
-                               t.incPieceAvailability(i)
-                       }
-               }
+               cn._peerPieces.Iterate(func(x uint32) bool {
+                       t.decPieceAvailability(pieceIndex(x))
+                       return true
+               })
        }
+       t.addConnWithAllPieces(&cn.Peer)
        cn.peerSentHaveAll = true
        cn._peerPieces.Clear()
        if !cn.t._pendingPieces.IsEmpty() {
@@ -824,7 +844,9 @@ func (cn *PeerConn) onPeerSentHaveAll() error {
 }
 
 func (cn *PeerConn) peerSentHaveNone() error {
-       cn.t.decPeerPieceAvailability(&cn.Peer)
+       if cn.peerSentHaveAll {
+               cn.t.decPeerPieceAvailability(&cn.Peer)
+       }
        cn._peerPieces.Clear()
        cn.peerSentHaveAll = false
        cn.peerPiecesChanged()
index 93e512a0ccdce66db359625ffd30d581697a3aad..7ecb6933357460ea77c244d7c1622d3c3ada1cb5 100644 (file)
@@ -222,8 +222,10 @@ func TestHaveAllThenBitfield(t *testing.T) {
        pc.peerImpl = &pc
        tt.conns[&pc] = struct{}{}
        c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
+       c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
        pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
        c.Check(pc.peerMinPieces, qt.Equals, 6)
+       c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
        c.Assert(pc.t.setInfo(&metainfo.Info{
                PieceLength: 0,
                Pieces:      make([]byte, pieceHash.Size()*7),
index 6caa76284d90f2982226149c32942d9d89c2ffc6..1c4375f1af0e66b3c156a1104c0c37436a421336 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -30,7 +30,10 @@ type Piece struct {
 
        publicPieceState PieceState
        priority         piecePriority
-       availability     int64
+       // Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is
+       // incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and
+       // the Peer isn't recorded in Torrent.connsWithAllPieces.
+       relativeAvailability int
 
        // This can be locked when the Client lock is taken, but probably not vice versa.
        pendingWritesMutex sync.Mutex
@@ -279,3 +282,7 @@ func (me *undirtiedChunksIter) Iter(f func(chunkIndexType)) {
 func (p *Piece) requestIndexOffset() RequestIndex {
        return p.t.pieceRequestIndexOffset(p.index)
 }
+
+func (p *Piece) availability() int {
+       return len(p.t.connsWithAllPieces) + p.relativeAvailability
+}
index 50c9838fa1c08530dfea60546b0ed8786c4f1a50..cb69d23d37d9ca800bc4fb63107ac62f026dc16c 100644 (file)
@@ -24,9 +24,12 @@ type (
 func pieceOrderLess(i, j *pieceRequestOrderItem) multiless.Computation {
        return multiless.New().Int(
                int(j.state.Priority), int(i.state.Priority),
+               // TODO: Should we match on complete here to prevent churn when availability changes?
        ).Bool(
                j.state.Partial, i.state.Partial,
-       ).Int64(
+       ).Int(
+               // If this is done with relative availability, do we lose some determinism? If completeness
+               // is used, would that push this far enough down?
                i.state.Availability, j.state.Availability,
        ).Int(
                i.key.Index, j.key.Index,
index c9a89663706c20dc20ea458d855e5edeceaca364..dbdac738e5a16f8ee08cd3190bd994a4ca559314 100644 (file)
@@ -27,7 +27,7 @@ type PieceRequestOrderKey struct {
 type PieceRequestOrderState struct {
        Priority     piecePriority
        Partial      bool
-       Availability int64
+       Availability int
 }
 
 type pieceRequestOrderItem struct {
index 59f6693c26dc84725d3a5ab448e2fba1ac65fe73..53d5f324e45a3f9217b2378604c863d3605bcb6d 100644 (file)
@@ -19,7 +19,7 @@ func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRe
        return request_strategy.PieceRequestOrderState{
                Priority:     t.piece(i).purePriority(),
                Partial:      t.piecePartiallyDownloaded(i),
-               Availability: t.piece(i).availability,
+               Availability: t.piece(i).availability(),
        }
 }
 
@@ -125,8 +125,8 @@ func (p *peerRequests) Less(i, j int) bool {
                -int(rightPiece.purePriority()),
        )
        ml = ml.Int(
-               int(leftPiece.availability),
-               int(rightPiece.availability))
+               int(leftPiece.relativeAvailability),
+               int(rightPiece.relativeAvailability))
        return ml.Less()
 }
 
index ce4d7f598140786b886ec4b0fceac4246f02001e..f82032be768a0fefda299fb902719ba18a5c6491 100644 (file)
@@ -85,7 +85,6 @@ type Torrent struct {
        files     *[]*File
 
        webSeeds map[string]*Peer
-
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
        conns               map[*PeerConn]struct{}
@@ -137,6 +136,7 @@ type Torrent struct {
        activePieceHashes         int
        initialPieceCheckDisabled bool
 
+       connsWithAllPieces map[*Peer]struct{}
        // Count of each request across active connections.
        pendingRequests map[RequestIndex]*Peer
        lastRequested   map[RequestIndex]time.Time
@@ -149,8 +149,12 @@ type Torrent struct {
        Complete chansync.Flag
 }
 
-func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
+func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
+       // This could be done with roaring.BitSliceIndexing.
        t.iterPeers(func(peer *Peer) {
+               if _, ok := t.connsWithAllPieces[peer]; ok {
+                       return
+               }
                if peer.peerHasPiece(i) {
                        count++
                }
@@ -163,10 +167,10 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
                return
        }
        p := t.piece(i)
-       if p.availability <= 0 {
-               panic(p.availability)
+       if p.relativeAvailability <= 0 {
+               panic(p.relativeAvailability)
        }
-       p.availability--
+       p.relativeAvailability--
        t.updatePieceRequestOrder(i)
 }
 
@@ -174,7 +178,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) {
        // If we don't the info, this should be reconciled when we do.
        if t.haveInfo() {
                p := t.piece(i)
-               p.availability++
+               p.relativeAvailability++
                t.updatePieceRequestOrder(i)
        }
 }
@@ -443,12 +447,12 @@ func (t *Torrent) onSetInfo() {
        t.initPieceRequestOrder()
        for i := range t.pieces {
                p := &t.pieces[i]
-               // Need to add availability before updating piece completion, as that may result in conns
+               // Need to add relativeAvailability before updating piece completion, as that may result in conns
                // being dropped.
-               if p.availability != 0 {
-                       panic(p.availability)
+               if p.relativeAvailability != 0 {
+                       panic(p.relativeAvailability)
                }
-               p.availability = int64(t.pieceAvailabilityFromPeers(i))
+               p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
                t.addRequestOrderPiece(i)
                t.updatePieceCompletion(pieceIndex(i))
                if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
@@ -571,7 +575,7 @@ func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMe
 
 type pieceAvailabilityRun struct {
        Count        pieceIndex
-       Availability int64
+       Availability int
 }
 
 func (me pieceAvailabilityRun) String() string {
@@ -580,10 +584,10 @@ func (me pieceAvailabilityRun) String() string {
 
 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
        rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
-               ret = append(ret, pieceAvailabilityRun{Availability: el.(int64), Count: int(count)})
+               ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
        })
        for i := range t.pieces {
-               rle.Append(t.pieces[i].availability, 1)
+               rle.Append(t.pieces[i].availability(), 1)
        }
        rle.Flush()
        return
@@ -836,6 +840,12 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        if t.storage != nil {
                t.deletePieceRequestOrder()
        }
+       for i := range t.pieces {
+               p := t.piece(i)
+               if p.relativeAvailability != 0 {
+                       panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
+               }
+       }
        t.pex.Reset()
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
@@ -1407,14 +1417,20 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
                })
        }
        t.assertPendingRequests()
+       if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
+               panic(t.connsWithAllPieces)
+       }
        return
 }
 
 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
+       if t.deleteConnWithAllPieces(p) {
+               return
+       }
        if !t.haveInfo() {
                return
        }
-       p.newPeerPieces().Iterate(func(i uint32) bool {
+       p.peerPieces().Iterate(func(i uint32) bool {
                p.t.decPieceAvailability(pieceIndex(i))
                return true
        })
@@ -2319,3 +2335,20 @@ func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
        return t.pendingRequests[r]
 }
+
+func (t *Torrent) addConnWithAllPieces(p *Peer) {
+       if t.connsWithAllPieces == nil {
+               t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
+       }
+       t.connsWithAllPieces[p] = struct{}{}
+}
+
+func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
+       _, ok := t.connsWithAllPieces[p]
+       delete(t.connsWithAllPieces, p)
+       return ok
+}
+
+func (t *Torrent) numActivePeers() int {
+       return len(t.conns) + len(t.webSeeds)
+}