From 99a7cb9291b0495369c0b688443175873caf25ed Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 10 May 2021 17:05:08 +1000 Subject: [PATCH] Missed piece availability code --- torrent.go | 78 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 63 insertions(+), 15 deletions(-) diff --git a/torrent.go b/torrent.go index c56ddbd1..ae78625f 100644 --- a/torrent.go +++ b/torrent.go @@ -147,7 +147,7 @@ type Torrent struct { pex pexState } -func (t *Torrent) pieceAvailability(i pieceIndex) (count int) { +func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) { t.iterPeers(func(peer *Peer) { if peer.peerHasPiece(i) { count++ @@ -156,6 +156,19 @@ func (t *Torrent) pieceAvailability(i pieceIndex) (count int) { return } +func (t *Torrent) decPieceAvailability(i pieceIndex) { + p := t.piece(i) + if p.availability <= 0 { + panic(p.availability) + } + p.availability-- +} + +func (t *Torrent) incPieceAvailability(i pieceIndex) { + p := t.piece(i) + p.availability++ +} + func (t *Torrent) numConns() int { return len(t.conns) } @@ -422,6 +435,10 @@ func (t *Torrent) onSetInfo() { // t.logger.Printf("piece %s completion unknown, queueing check", p) t.queuePieceCheck(pieceIndex(i)) } + if p.availability != 0 { + panic(p.availability) + } + p.availability = int64(t.pieceAvailabilityFromPeers(i)) } t.cl.event.Broadcast() t.gotMetainfo.Set() @@ -535,6 +552,22 @@ func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType int, piece in } } +type pieceAvailabilityRun struct { + availability int64 + count pieceIndex +} + +func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) { + rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) { + ret = append(ret, pieceAvailabilityRun{el.(int64), int(count)}) + }) + for _, p := range t.pieces { + rle.Append(p.availability, 1) + } + rle.Flush() + return +} + func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) { rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) { ret = append(ret, PieceStateRun{ @@ -605,17 +638,21 @@ func (t *Torrent) writeStatus(w io.Writer) { } fmt.Fprintln(w) } - fmt.Fprintf(w, "Piece length: %s\n", func() string { - if t.haveInfo() { - return fmt.Sprint(t.usualPieceSize()) - } else { - return "?" - } - }()) + fmt.Fprintf(w, "Piece length: %s\n", + func() string { + if t.haveInfo() { + return fmt.Sprintf("%v (%v chunks)", + t.usualPieceSize(), + float64(t.usualPieceSize())/float64(t.chunkSize)) + } else { + return "no info" + } + }(), + ) if t.info != nil { fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted()) - fmt.Fprintf(w, "Piece States: %s", t.pieceStateRuns()) - fmt.Fprintln(w) + fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns()) + fmt.Fprintf(w, "Piece availability: %v\n", t.pieceAvailabilityRuns()) } fmt.Fprintf(w, "Reader Pieces:") t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) { @@ -888,11 +925,14 @@ func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int { } func (t *Torrent) wantPieceIndex(index pieceIndex) bool { - if !t.haveInfo() { - return false - } - if index < 0 || index >= t.numPieces() { - return false + // TODO: Are these overly conservative, should we be guarding this here? + { + if !t.haveInfo() { + return false + } + if index < 0 || index >= t.numPieces() { + return false + } } p := &t.pieces[index] if p.queuedForHash() { @@ -1317,6 +1357,13 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) { return } +func (t *Torrent) decPeerPieceAvailability(p *Peer) { + p.newPeerPieces().IterTyped(func(i int) bool { + p.t.decPieceAvailability(i) + return true + }) +} + func (t *Torrent) numActivePeers() (num int) { t.iterPeers(func(*Peer) { num++ @@ -2154,6 +2201,7 @@ func (t *Torrent) addWebSeed(url string) { ws.onGotInfo(t.info) } t.webSeeds[url] = &ws.peer + ws.peer.onPeerHasAllPieces() } func (t *Torrent) peerIsActive(p *Peer) (active bool) { -- 2.48.1