From e04b9de09649f7aea8e1bb2c270588419b15327b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 20 Mar 2024 12:34:35 +1100 Subject: [PATCH] Improve handling of v2 hashes --- peerconn.go | 17 ++++++----------- piece.go | 24 +++++++++++++++++++++++- request-strategy-impls.go | 14 ++++++++------ torrent.go | 19 +++++++------------ 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/peerconn.go b/peerconn.go index 470ca5cf..6aa8f210 100644 --- a/peerconn.go +++ b/peerconn.go @@ -1245,20 +1245,15 @@ func (pc *PeerConn) WriteExtendedMessage(extName pp.ExtensionName, payload []byt return nil } +func (pc *PeerConn) shouldRequestHashes() bool { + return pc.t.haveInfo() && pc.v2 && pc.t.info.HasV2() +} + func (pc *PeerConn) requestMissingHashes() { - if pc.peerChoking { - return - } - if !pc.t.haveInfo() { + if !pc.shouldRequestHashes() { return } info := pc.t.info - if !info.HasV2() { - return - } - if !pc.v2 { - return - } baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo( uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)), )) @@ -1342,7 +1337,7 @@ func (pc *PeerConn) onReadHashes(msg *pp.Message) (err error) { file, file.numPieces()) for filePieceIndex, peerHash := range filePieceHashes { torrentPieceIndex := file.BeginPieceIndex() + filePieceIndex - pc.t.piece(torrentPieceIndex).hashV2.Set(peerHash) + pc.t.piece(torrentPieceIndex).setV2Hash(peerHash) } } else { pc.protocolLogger.WithNames(v2HashesLogName).Levelf( diff --git a/piece.go b/piece.go index 0aa1e633..6158e89a 100644 --- a/piece.go +++ b/piece.go @@ -229,8 +229,12 @@ func (p *Piece) purePriority() (ret piecePriority) { return } +func (p *Piece) ignoreForRequests() bool { + return p.hashing || p.marking || !p.haveHash() || p.t.pieceComplete(p.index) || p.queuedForHash() +} + func (p *Piece) uncachedPriority() (ret piecePriority) { - if p.hashing || p.marking || p.t.pieceComplete(p.index) || p.queuedForHash() { + if p.ignoreForRequests() { return PiecePriorityNone } return p.purePriority() @@ -274,3 +278,21 @@ func (p *Piece) mustGetOnlyFile() *File { } return p.files[0] } + +// Sets the v2 piece hash, queuing initial piece checks if appropriate. +func (p *Piece) setV2Hash(v2h [32]byte) { + // See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we didn't yet + // have a piece hash (can occur with v2 when we don't start with piece layers). + if !p.hashV2.Set(v2h).Ok && p.hash == nil { + p.t.queueInitialPieceCheck(p.index) + } +} + +// Can't do certain things if we don't know the piece hash. +func (p *Piece) haveHash() bool { + return p.hash != nil || p.hashV2.Ok +} + +func pieceStateAllowsMessageWrites(p *Piece, pc *PeerConn) bool { + return (pc.shouldRequestHashes() && !p.haveHash()) || !p.t.ignorePieceForRequests(p.index) +} diff --git a/request-strategy-impls.go b/request-strategy-impls.go index 43acca13..8c59ec43 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -75,7 +75,7 @@ type requestStrategyTorrent struct { } func (r requestStrategyTorrent) Piece(i int) request_strategy.Piece { - return (*requestStrategyPiece)(r.t.piece(i)) + return requestStrategyPiece{r.t.piece(i)} } func (r requestStrategyTorrent) PieceLength() int64 { @@ -84,14 +84,16 @@ func (r requestStrategyTorrent) PieceLength() int64 { var _ request_strategy.Torrent = requestStrategyTorrent{} -type requestStrategyPiece Piece +type requestStrategyPiece struct { + p *Piece +} -func (r *requestStrategyPiece) Request() bool { - return !r.t.ignorePieceForRequests(r.index) +func (r requestStrategyPiece) Request() bool { + return !r.p.ignoreForRequests() } -func (r *requestStrategyPiece) NumPendingChunks() int { - return int(r.t.pieceNumPendingChunks(r.index)) +func (r requestStrategyPiece) NumPendingChunks() int { + return int(r.p.t.pieceNumPendingChunks(r.p.index)) } var _ request_strategy.Piece = (*requestStrategyPiece)(nil) diff --git a/torrent.go b/torrent.go index 78b76194..e2375a44 100644 --- a/torrent.go +++ b/torrent.go @@ -144,8 +144,8 @@ type Torrent struct { _readerNowPieces bitmap.Bitmap _readerReadaheadPieces bitmap.Bitmap - // A cache of pieces we need to get. Calculated from various piece and - // file priorities and completion states elsewhere. + // A cache of pieces we need to get. Calculated from various piece and file priorities and + // completion states elsewhere. Includes piece data and piece v2 hashes. _pendingPieces roaring.Bitmap // A cache of completed piece indices. _completedPieces roaring.Bitmap @@ -232,7 +232,7 @@ func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap { } func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool { - return !t.wantPieceIndex(i) + return t.piece(i).ignoreForRequests() } // Returns a channel that is closed when the Torrent is closed. @@ -458,12 +458,7 @@ func (t *Torrent) AddPieceLayers(layers map[string]string) (err error) { for i := range f.numPieces() { pi := f.BeginPieceIndex() + i p := t.piece(pi) - // See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we - // didn't yet have a piece hash (can occur with v2 when we don't start with piece - // layers). - if !p.hashV2.Set(hashes[i]).Ok && p.hash == nil { - t.queueInitialPieceCheck(pi) - } + p.setV2Hash(hashes[i]) } } return nil @@ -1170,7 +1165,7 @@ func (t *Torrent) hashPiece(piece pieceIndex) ( var written int64 written, err = storagePiece.WriteTo(io.MultiWriter(writers...)) if err == nil && written != int64(p.length()) { - err = io.ErrShortWrite + err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length()) } if logPieceContents { t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err) @@ -1411,10 +1406,10 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange // request order. t.updatePieceRequestOrderPiece(piece) } - p := &t.pieces[piece] + p := t.piece(piece) newPrio := p.uncachedPriority() // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) - if newPrio == PiecePriorityNone { + if newPrio == PiecePriorityNone && p.haveHash() { return t._pendingPieces.CheckedRemove(uint32(piece)) } else { return t._pendingPieces.CheckedAdd(uint32(piece)) -- 2.44.0