Most overhead comes from peers that connect that have everything, and we just increment every single piece's availability. There may be some unresolved determinism with torrents that share the same ordering.
assert.True(t, _new)
defer tt.Drop()
cn := &PeerConn{Peer: Peer{
- t: tt,
+ t: tt,
+ callbacks: &cfg.Callbacks,
}}
+ tt.conns[cn] = struct{}{}
cn.peerImpl = cn
cl.lock()
defer cl.unlock()
"errors"
"net"
+ "github.com/RoaringBitmap/roaring"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent/types"
"golang.org/x/time/rate"
InfoHash = metainfo.Hash
IpPort = missinggo.IpPort
)
+
+func boolSliceToBitmap(slice []bool) (rb roaring.Bitmap) {
+ for i, b := range slice {
+ if b {
+ rb.AddInt(i)
+ }
+ }
+ return
+}
// Ignore known excess pieces.
bf = bf[:cn.t.numPieces()]
}
- pp := cn.newPeerPieces()
+ bm := boolSliceToBitmap(bf)
+ if cn.t.haveInfo() && pieceIndex(bm.GetCardinality()) == cn.t.numPieces() {
+ cn.onPeerHasAllPieces()
+ return nil
+ }
+ if !bm.IsEmpty() {
+ cn.raisePeerMinPieces(pieceIndex(bm.Maximum()) + 1)
+ }
+ shouldUpdateRequests := false
+ if cn.peerSentHaveAll {
+ if !cn.t.deleteConnWithAllPieces(&cn.Peer) {
+ panic(cn)
+ }
+ cn.peerSentHaveAll = false
+ if !cn._peerPieces.IsEmpty() {
+ panic("if peer has all, we expect no individual peer pieces to be set")
+ }
+ } else {
+ bm.Xor(&cn._peerPieces)
+ }
cn.peerSentHaveAll = false
- for i, have := range bf {
- if have {
- cn.raisePeerMinPieces(pieceIndex(i) + 1)
- if !pp.Contains(bitmap.BitIndex(i)) {
- cn.t.incPieceAvailability(i)
- }
+ // bm is now 'on' for pieces that are changing
+ bm.Iterate(func(x uint32) bool {
+ pi := pieceIndex(x)
+ if cn._peerPieces.Contains(x) {
+ // Then we must be losing this piece
+ cn.t.decPieceAvailability(pi)
} else {
- if pp.Contains(bitmap.BitIndex(i)) {
- cn.t.decPieceAvailability(i)
+ if !shouldUpdateRequests && cn.t.wantPieceIndex(pieceIndex(x)) {
+ shouldUpdateRequests = true
}
+ // We must be gaining this piece
+ cn.t.incPieceAvailability(pieceIndex(x))
}
- if have {
- cn._peerPieces.Add(uint32(i))
- if cn.t.wantPieceIndex(i) {
- cn.updateRequests("bitfield")
- }
- } else {
- cn._peerPieces.Remove(uint32(i))
- }
+ return true
+ })
+ // Apply the changes. If we had everything previously, this should be empty, so xor is the same
+ // as or.
+ cn._peerPieces.Xor(&bm)
+ if shouldUpdateRequests {
+ cn.updateRequests("bitfield")
}
+ // We didn't guard this before, I see no reason to do it now.
cn.peerPiecesChanged()
return nil
}
func (cn *PeerConn) onPeerHasAllPieces() {
t := cn.t
if t.haveInfo() {
- npp, pc := cn.newPeerPieces(), t.numPieces()
- for i := 0; i < pc; i += 1 {
- if !npp.Contains(bitmap.BitIndex(i)) {
- t.incPieceAvailability(i)
- }
- }
+ cn._peerPieces.Iterate(func(x uint32) bool {
+ t.decPieceAvailability(pieceIndex(x))
+ return true
+ })
}
+ t.addConnWithAllPieces(&cn.Peer)
cn.peerSentHaveAll = true
cn._peerPieces.Clear()
if !cn.t._pendingPieces.IsEmpty() {
}
func (cn *PeerConn) peerSentHaveNone() error {
- cn.t.decPeerPieceAvailability(&cn.Peer)
+ if cn.peerSentHaveAll {
+ cn.t.decPeerPieceAvailability(&cn.Peer)
+ }
cn._peerPieces.Clear()
cn.peerSentHaveAll = false
cn.peerPiecesChanged()
pc.peerImpl = &pc
tt.conns[&pc] = struct{}{}
c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
+ c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
c.Check(pc.peerMinPieces, qt.Equals, 6)
+ c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
c.Assert(pc.t.setInfo(&metainfo.Info{
PieceLength: 0,
Pieces: make([]byte, pieceHash.Size()*7),
publicPieceState PieceState
priority piecePriority
- availability int64
+ // Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is
+ // incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and
+ // the Peer isn't recorded in Torrent.connsWithAllPieces.
+ relativeAvailability int
// This can be locked when the Client lock is taken, but probably not vice versa.
pendingWritesMutex sync.Mutex
func (p *Piece) requestIndexOffset() RequestIndex {
return p.t.pieceRequestIndexOffset(p.index)
}
+
+func (p *Piece) availability() int {
+ return len(p.t.connsWithAllPieces) + p.relativeAvailability
+}
func pieceOrderLess(i, j *pieceRequestOrderItem) multiless.Computation {
return multiless.New().Int(
int(j.state.Priority), int(i.state.Priority),
+ // TODO: Should we match on complete here to prevent churn when availability changes?
).Bool(
j.state.Partial, i.state.Partial,
- ).Int64(
+ ).Int(
+ // If this is done with relative availability, do we lose some determinism? If completeness
+ // is used, would that push this far enough down?
i.state.Availability, j.state.Availability,
).Int(
i.key.Index, j.key.Index,
type PieceRequestOrderState struct {
Priority piecePriority
Partial bool
- Availability int64
+ Availability int
}
type pieceRequestOrderItem struct {
return request_strategy.PieceRequestOrderState{
Priority: t.piece(i).purePriority(),
Partial: t.piecePartiallyDownloaded(i),
- Availability: t.piece(i).availability,
+ Availability: t.piece(i).availability(),
}
}
-int(rightPiece.purePriority()),
)
ml = ml.Int(
- int(leftPiece.availability),
- int(rightPiece.availability))
+ int(leftPiece.relativeAvailability),
+ int(rightPiece.relativeAvailability))
return ml.Less()
}
files *[]*File
webSeeds map[string]*Peer
-
// Active peer connections, running message stream loops. TODO: Make this
// open (not-closed) connections only.
conns map[*PeerConn]struct{}
activePieceHashes int
initialPieceCheckDisabled bool
+ connsWithAllPieces map[*Peer]struct{}
// Count of each request across active connections.
pendingRequests map[RequestIndex]*Peer
lastRequested map[RequestIndex]time.Time
Complete chansync.Flag
}
-func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
+func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
+ // This could be done with roaring.BitSliceIndexing.
t.iterPeers(func(peer *Peer) {
+ if _, ok := t.connsWithAllPieces[peer]; ok {
+ return
+ }
if peer.peerHasPiece(i) {
count++
}
return
}
p := t.piece(i)
- if p.availability <= 0 {
- panic(p.availability)
+ if p.relativeAvailability <= 0 {
+ panic(p.relativeAvailability)
}
- p.availability--
+ p.relativeAvailability--
t.updatePieceRequestOrder(i)
}
// If we don't the info, this should be reconciled when we do.
if t.haveInfo() {
p := t.piece(i)
- p.availability++
+ p.relativeAvailability++
t.updatePieceRequestOrder(i)
}
}
t.initPieceRequestOrder()
for i := range t.pieces {
p := &t.pieces[i]
- // Need to add availability before updating piece completion, as that may result in conns
+ // Need to add relativeAvailability before updating piece completion, as that may result in conns
// being dropped.
- if p.availability != 0 {
- panic(p.availability)
+ if p.relativeAvailability != 0 {
+ panic(p.relativeAvailability)
}
- p.availability = int64(t.pieceAvailabilityFromPeers(i))
+ p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
t.addRequestOrderPiece(i)
t.updatePieceCompletion(pieceIndex(i))
if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
type pieceAvailabilityRun struct {
Count pieceIndex
- Availability int64
+ Availability int
}
func (me pieceAvailabilityRun) String() string {
func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
- ret = append(ret, pieceAvailabilityRun{Availability: el.(int64), Count: int(count)})
+ ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
})
for i := range t.pieces {
- rle.Append(t.pieces[i].availability, 1)
+ rle.Append(t.pieces[i].availability(), 1)
}
rle.Flush()
return
if t.storage != nil {
t.deletePieceRequestOrder()
}
+ for i := range t.pieces {
+ p := t.piece(i)
+ if p.relativeAvailability != 0 {
+ panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
+ }
+ }
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
})
}
t.assertPendingRequests()
+ if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
+ panic(t.connsWithAllPieces)
+ }
return
}
func (t *Torrent) decPeerPieceAvailability(p *Peer) {
+ if t.deleteConnWithAllPieces(p) {
+ return
+ }
if !t.haveInfo() {
return
}
- p.newPeerPieces().Iterate(func(i uint32) bool {
+ p.peerPieces().Iterate(func(i uint32) bool {
p.t.decPieceAvailability(pieceIndex(i))
return true
})
func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
return t.pendingRequests[r]
}
+
+func (t *Torrent) addConnWithAllPieces(p *Peer) {
+ if t.connsWithAllPieces == nil {
+ t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
+ }
+ t.connsWithAllPieces[p] = struct{}{}
+}
+
+func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
+ _, ok := t.connsWithAllPieces[p]
+ delete(t.connsWithAllPieces, p)
+ return ok
+}
+
+func (t *Torrent) numActivePeers() int {
+ return len(t.conns) + len(t.webSeeds)
+}