]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Improve handling of v2 hashes
authorMatt Joiner <anacrolix@gmail.com>
Wed, 20 Mar 2024 01:34:35 +0000 (12:34 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 20 Mar 2024 01:34:35 +0000 (12:34 +1100)
peerconn.go
piece.go
request-strategy-impls.go
torrent.go

index 470ca5cfe0b081b5642238b3d5da34d0d5bd8727..6aa8f2108155f5455c3ed26272073836836408d9 100644 (file)
@@ -1245,20 +1245,15 @@ func (pc *PeerConn) WriteExtendedMessage(extName pp.ExtensionName, payload []byt
        return nil
 }
 
+func (pc *PeerConn) shouldRequestHashes() bool {
+       return pc.t.haveInfo() && pc.v2 && pc.t.info.HasV2()
+}
+
 func (pc *PeerConn) requestMissingHashes() {
-       if pc.peerChoking {
-               return
-       }
-       if !pc.t.haveInfo() {
+       if !pc.shouldRequestHashes() {
                return
        }
        info := pc.t.info
-       if !info.HasV2() {
-               return
-       }
-       if !pc.v2 {
-               return
-       }
        baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo(
                uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)),
        ))
@@ -1342,7 +1337,7 @@ func (pc *PeerConn) onReadHashes(msg *pp.Message) (err error) {
                        file, file.numPieces())
                for filePieceIndex, peerHash := range filePieceHashes {
                        torrentPieceIndex := file.BeginPieceIndex() + filePieceIndex
-                       pc.t.piece(torrentPieceIndex).hashV2.Set(peerHash)
+                       pc.t.piece(torrentPieceIndex).setV2Hash(peerHash)
                }
        } else {
                pc.protocolLogger.WithNames(v2HashesLogName).Levelf(
index 0aa1e6339626b141e02403f1d5586512f55fd579..6158e89ae4dc0a79a3b8fbdc89331ebd9a72f03a 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -229,8 +229,12 @@ func (p *Piece) purePriority() (ret piecePriority) {
        return
 }
 
+func (p *Piece) ignoreForRequests() bool {
+       return p.hashing || p.marking || !p.haveHash() || p.t.pieceComplete(p.index) || p.queuedForHash()
+}
+
 func (p *Piece) uncachedPriority() (ret piecePriority) {
-       if p.hashing || p.marking || p.t.pieceComplete(p.index) || p.queuedForHash() {
+       if p.ignoreForRequests() {
                return PiecePriorityNone
        }
        return p.purePriority()
@@ -274,3 +278,21 @@ func (p *Piece) mustGetOnlyFile() *File {
        }
        return p.files[0]
 }
+
+// Sets the v2 piece hash, queuing initial piece checks if appropriate.
+func (p *Piece) setV2Hash(v2h [32]byte) {
+       // See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we didn't yet
+       // have a piece hash (can occur with v2 when we don't start with piece layers).
+       if !p.hashV2.Set(v2h).Ok && p.hash == nil {
+               p.t.queueInitialPieceCheck(p.index)
+       }
+}
+
+// Can't do certain things if we don't know the piece hash.
+func (p *Piece) haveHash() bool {
+       return p.hash != nil || p.hashV2.Ok
+}
+
+func pieceStateAllowsMessageWrites(p *Piece, pc *PeerConn) bool {
+       return (pc.shouldRequestHashes() && !p.haveHash()) || !p.t.ignorePieceForRequests(p.index)
+}
index 43acca13433242faa736dfdf7c416320d68bdccf..8c59ec4398217ecdb3793b69ec421c1dcb8cec78 100644 (file)
@@ -75,7 +75,7 @@ type requestStrategyTorrent struct {
 }
 
 func (r requestStrategyTorrent) Piece(i int) request_strategy.Piece {
-       return (*requestStrategyPiece)(r.t.piece(i))
+       return requestStrategyPiece{r.t.piece(i)}
 }
 
 func (r requestStrategyTorrent) PieceLength() int64 {
@@ -84,14 +84,16 @@ func (r requestStrategyTorrent) PieceLength() int64 {
 
 var _ request_strategy.Torrent = requestStrategyTorrent{}
 
-type requestStrategyPiece Piece
+type requestStrategyPiece struct {
+       p *Piece
+}
 
-func (r *requestStrategyPiece) Request() bool {
-       return !r.t.ignorePieceForRequests(r.index)
+func (r requestStrategyPiece) Request() bool {
+       return !r.p.ignoreForRequests()
 }
 
-func (r *requestStrategyPiece) NumPendingChunks() int {
-       return int(r.t.pieceNumPendingChunks(r.index))
+func (r requestStrategyPiece) NumPendingChunks() int {
+       return int(r.p.t.pieceNumPendingChunks(r.p.index))
 }
 
 var _ request_strategy.Piece = (*requestStrategyPiece)(nil)
index 78b761943ac6c2147599aaf43108a3b9b9a0c61c..e2375a441ea66ef8c7eee4968c426624939055c1 100644 (file)
@@ -144,8 +144,8 @@ type Torrent struct {
        _readerNowPieces       bitmap.Bitmap
        _readerReadaheadPieces bitmap.Bitmap
 
-       // A cache of pieces we need to get. Calculated from various piece and
-       // file priorities and completion states elsewhere.
+       // A cache of pieces we need to get. Calculated from various piece and file priorities and
+       // completion states elsewhere. Includes piece data and piece v2 hashes.
        _pendingPieces roaring.Bitmap
        // A cache of completed piece indices.
        _completedPieces roaring.Bitmap
@@ -232,7 +232,7 @@ func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
 }
 
 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
-       return !t.wantPieceIndex(i)
+       return t.piece(i).ignoreForRequests()
 }
 
 // Returns a channel that is closed when the Torrent is closed.
@@ -458,12 +458,7 @@ func (t *Torrent) AddPieceLayers(layers map[string]string) (err error) {
                for i := range f.numPieces() {
                        pi := f.BeginPieceIndex() + i
                        p := t.piece(pi)
-                       // See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we
-                       // didn't yet have a piece hash (can occur with v2 when we don't start with piece
-                       // layers).
-                       if !p.hashV2.Set(hashes[i]).Ok && p.hash == nil {
-                               t.queueInitialPieceCheck(pi)
-                       }
+                       p.setV2Hash(hashes[i])
                }
        }
        return nil
@@ -1170,7 +1165,7 @@ func (t *Torrent) hashPiece(piece pieceIndex) (
        var written int64
        written, err = storagePiece.WriteTo(io.MultiWriter(writers...))
        if err == nil && written != int64(p.length()) {
-               err = io.ErrShortWrite
+               err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length())
        }
        if logPieceContents {
                t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
@@ -1411,10 +1406,10 @@ func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChange
                // request order.
                t.updatePieceRequestOrderPiece(piece)
        }
-       p := &t.pieces[piece]
+       p := t.piece(piece)
        newPrio := p.uncachedPriority()
        // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
-       if newPrio == PiecePriorityNone {
+       if newPrio == PiecePriorityNone && p.haveHash() {
                return t._pendingPieces.CheckedRemove(uint32(piece))
        } else {
                return t._pendingPieces.CheckedAdd(uint32(piece))