files *[]*File
webSeeds map[string]*Peer
-
// Active peer connections, running message stream loops. TODO: Make this
// open (not-closed) connections only.
conns map[*PeerConn]struct{}
activePieceHashes int
initialPieceCheckDisabled bool
+ connsWithAllPieces map[*Peer]struct{}
// Count of each request across active connections.
pendingRequests map[RequestIndex]*Peer
lastRequested map[RequestIndex]time.Time
Complete chansync.Flag
}
-func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
+func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
+ // This could be done with roaring.BitSliceIndexing.
t.iterPeers(func(peer *Peer) {
+ if _, ok := t.connsWithAllPieces[peer]; ok {
+ return
+ }
if peer.peerHasPiece(i) {
count++
}
return
}
p := t.piece(i)
- if p.availability <= 0 {
- panic(p.availability)
+ if p.relativeAvailability <= 0 {
+ panic(p.relativeAvailability)
}
- p.availability--
+ p.relativeAvailability--
t.updatePieceRequestOrder(i)
}
// If we don't the info, this should be reconciled when we do.
if t.haveInfo() {
p := t.piece(i)
- p.availability++
+ p.relativeAvailability++
t.updatePieceRequestOrder(i)
}
}
t.initPieceRequestOrder()
for i := range t.pieces {
p := &t.pieces[i]
- // Need to add availability before updating piece completion, as that may result in conns
+ // Need to add relativeAvailability before updating piece completion, as that may result in conns
// being dropped.
- if p.availability != 0 {
- panic(p.availability)
+ if p.relativeAvailability != 0 {
+ panic(p.relativeAvailability)
}
- p.availability = int64(t.pieceAvailabilityFromPeers(i))
+ p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
t.addRequestOrderPiece(i)
t.updatePieceCompletion(pieceIndex(i))
if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
type pieceAvailabilityRun struct {
Count pieceIndex
- Availability int64
+ Availability int
}
func (me pieceAvailabilityRun) String() string {
func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
- ret = append(ret, pieceAvailabilityRun{Availability: el.(int64), Count: int(count)})
+ ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
})
for i := range t.pieces {
- rle.Append(t.pieces[i].availability, 1)
+ rle.Append(t.pieces[i].availability(), 1)
}
rle.Flush()
return
if t.storage != nil {
t.deletePieceRequestOrder()
}
+ for i := range t.pieces {
+ p := t.piece(i)
+ if p.relativeAvailability != 0 {
+ panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
+ }
+ }
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
})
}
t.assertPendingRequests()
+ if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
+ panic(t.connsWithAllPieces)
+ }
return
}
func (t *Torrent) decPeerPieceAvailability(p *Peer) {
+ if t.deleteConnWithAllPieces(p) {
+ return
+ }
if !t.haveInfo() {
return
}
- p.newPeerPieces().Iterate(func(i uint32) bool {
+ p.peerPieces().Iterate(func(i uint32) bool {
p.t.decPieceAvailability(pieceIndex(i))
return true
})
func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
return t.pendingRequests[r]
}
+
+func (t *Torrent) addConnWithAllPieces(p *Peer) {
+ if t.connsWithAllPieces == nil {
+ t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
+ }
+ t.connsWithAllPieces[p] = struct{}{}
+}
+
+func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
+ _, ok := t.connsWithAllPieces[p]
+ delete(t.connsWithAllPieces, p)
+ return ok
+}
+
+func (t *Torrent) numActivePeers() int {
+ return len(t.conns) + len(t.webSeeds)
+}