]> Sergey Matveev's repositories - btrtrc.git/commitdiff
v2 torrent piece hashing
authorMatt Joiner <anacrolix@gmail.com>
Tue, 27 Feb 2024 12:31:29 +0000 (23:31 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 2 Mar 2024 02:02:54 +0000 (13:02 +1100)
go.mod
go.sum
merkle/hash.go [new file with mode: 0644]
merkle/merkle.go
metainfo/bep52.go
torrent.go

diff --git a/go.mod b/go.mod
index 7358f281f25c2487379fee2d918da2922fbe0403..637f205a2d18e0e7df82840ccc8c01d8a67d50ef 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -11,7 +11,7 @@ require (
        github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444
        github.com/anacrolix/envpprof v1.3.0
        github.com/anacrolix/fuse v0.2.0
-       github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13
+       github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab
        github.com/anacrolix/go-libutp v1.3.1
        github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4
        github.com/anacrolix/missinggo v1.3.0
diff --git a/go.sum b/go.sum
index d371109d1d8d1a9176b63252b36751f47f9cdbd4..fb5f86b14f10b09ab79cce4838e8b88ab583242b 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -77,8 +77,8 @@ github.com/anacrolix/envpprof v1.3.0 h1:WJt9bpuT7A/CDCxPOv/eeZqHWlle/Y0keJUvc6tc
 github.com/anacrolix/envpprof v1.3.0/go.mod h1:7QIG4CaX1uexQ3tqd5+BRa/9e2D02Wcertl6Yh0jCB0=
 github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do=
 github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ=
-github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13 h1:qwOprPTDMM3BASJRf84mmZnTXRsPGGJ8xoHKQS7m3so=
-github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
+github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab h1:MvuAC/UJtcohN6xWc8zYXSZfllh1LVNepQ0R3BCX5I4=
+github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
 github.com/anacrolix/go-libutp v1.3.1 h1:idJzreNLl+hNjGC3ZnUOjujEaryeOGgkwHLqSGoige0=
 github.com/anacrolix/go-libutp v1.3.1/go.mod h1:heF41EC8kN0qCLMokLBVkB8NXiLwx3t8R8810MTNI5o=
 github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=
diff --git a/merkle/hash.go b/merkle/hash.go
new file mode 100644 (file)
index 0000000..18ecee7
--- /dev/null
@@ -0,0 +1,72 @@
+package merkle
+
+import (
+       "crypto/sha256"
+       "hash"
+)
+
+func NewHash() *Hash {
+       return &Hash{
+               nextBlock: sha256.New(),
+       }
+}
+
+type Hash struct {
+       blocks    [][32]byte
+       nextBlock hash.Hash
+       written   int
+}
+
+func (h *Hash) remaining() int {
+       return BlockSize - h.written
+}
+
+func (h *Hash) Write(p []byte) (n int, err error) {
+       for len(p) > 0 {
+               var n1 int
+               n1, err = h.nextBlock.Write(p[:min(len(p), h.remaining())])
+               n += n1
+               h.written += n1
+               p = p[n1:]
+               if h.remaining() == 0 {
+                       h.blocks = append(h.blocks, h.nextBlockSum())
+                       h.nextBlock.Reset()
+                       h.written = 0
+               }
+               if err != nil {
+                       break
+               }
+       }
+       return
+}
+
+func (h *Hash) nextBlockSum() (sum [32]byte) {
+       h.nextBlock.Sum(sum[:0])
+       return
+}
+
+func (h *Hash) Sum(b []byte) []byte {
+       blocks := h.blocks
+       if h.written != 0 {
+               blocks = append(blocks, h.nextBlockSum())
+       }
+       n := int(RoundUpToPowerOfTwo(uint(len(blocks))))
+       blocks = append(blocks, make([][32]byte, n-len(blocks))...)
+       sum := Root(blocks)
+       return append(b, sum[:]...)
+}
+
+func (h *Hash) Reset() {
+       h.blocks = h.blocks[:0]
+       h.nextBlock.Reset()
+}
+
+func (h *Hash) Size() int {
+       return 32
+}
+
+func (h *Hash) BlockSize() int {
+       return h.nextBlock.BlockSize()
+}
+
+var _ hash.Hash = (*Hash)(nil)
index 76985e8f78734ca7a275888d69a55313417afa5c..a6667cb42ba3429e168f9c40dc4f26b80b95382a 100644 (file)
@@ -7,8 +7,14 @@ import (
        "math/bits"
 )
 
+// The leaf block size for BitTorrent v2 Merkle trees.
+const BlockSize = 1 << 14 // 16KiB
+
 func Root(hashes [][sha256.Size]byte) [sha256.Size]byte {
-       if len(hashes) <= 1 {
+       switch len(hashes) {
+       case 0:
+               return sha256.Sum256(nil)
+       case 1:
                return hashes[0]
        }
        numHashes := uint(len(hashes))
index 18be726790f435a90301b6c7e354e3e3b3b04c12..8bdd19deb3aff793cab2441801d659d3864e5988 100644 (file)
@@ -25,8 +25,9 @@ func ValidatePieceLayers(
                if !ok {
                        // BEP 52: "For each file in the file tree that is larger than the piece size it
                        // contains one string value.". The reference torrent creator in
-                       // https://blog.libtorrent.org/2020/09/bittorrent-v2/ also has this. I'm not sure what
-                       // harm it causes if it's present anyway, possibly it won't be useful to us.
+                       // https://blog.libtorrent.org/2020/09/bittorrent-v2/ also has this. If a file is equal
+                       // to or smaller than the piece length, we can just use the pieces root instead of the
+                       // piece layer hash.
                        if ft.File.Length > pieceLength {
                                err = fmt.Errorf("no piece layers for file %q", path)
                        }
index 8416e4ba0935a7f2a8093c84844242b02271fc36..f15459b5af271260256d57f64b0804fd5d86138e 100644 (file)
@@ -10,6 +10,7 @@ import (
        "github.com/anacrolix/torrent/merkle"
        "github.com/anacrolix/torrent/types/infohash"
        infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
+       "hash"
        "io"
        "math/rand"
        "net/netip"
@@ -423,22 +424,35 @@ func (t *Torrent) AddPieceLayers(layers map[string]string) (err error) {
                        return
                }
                compactLayer, ok := layers[string(f.piecesRoot.Value[:])]
-               if !ok {
-                       continue
-               }
                var hashes [][32]byte
-               hashes, err = merkle.CompactLayerToSliceHashes(compactLayer)
-               if err != nil {
-                       err = fmt.Errorf("bad piece layers for file %q: %w", f, err)
-                       return
+               if ok {
+                       hashes, err = merkle.CompactLayerToSliceHashes(compactLayer)
+                       if err != nil {
+                               err = fmt.Errorf("bad piece layers for file %q: %w", f, err)
+                               return
+                       }
+               } else if f.length > t.info.PieceLength {
+                       // BEP 52 is pretty strongly worded about this, even though we should be able to
+                       // recover: If a v2 torrent is added by magnet link or infohash, we need to fetch piece
+                       // layers ourselves anyway, and that's how we can recover from this.
+                       t.logger.Levelf(log.Warning, "no piece layers for file %q", f)
+                       continue
+               } else {
+                       hashes = [][32]byte{f.piecesRoot.Value}
                }
                if len(hashes) != f.numPieces() {
                        err = fmt.Errorf("file %q: got %v hashes expected %v", f, len(hashes), f.numPieces())
                        return
                }
                for i := range f.numPieces() {
-                       p := t.piece(f.BeginPieceIndex() + i)
-                       p.hashV2.Set(hashes[i])
+                       pi := f.BeginPieceIndex() + i
+                       p := t.piece(pi)
+                       // See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we
+                       // didn't yet have a piece hash (can occur with v2 when we don't start with piece
+                       // layers).
+                       if !p.hashV2.Set(hashes[i]).Ok && p.hash == nil {
+                               t.queueInitialPieceCheck(pi)
+                       }
                }
        }
        return nil
@@ -521,10 +535,7 @@ func (t *Torrent) onSetInfo() {
                p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
                t.addRequestOrderPiece(i)
                t.updatePieceCompletion(i)
-               if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
-                       // t.logger.Printf("piece %s completion unknown, queueing check", p)
-                       t.queuePieceCheck(i)
-               }
+               t.queueInitialPieceCheck(i)
        }
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
@@ -1057,28 +1068,39 @@ func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWr
 }
 
 func (t *Torrent) hashPiece(piece pieceIndex) (
-       ret metainfo.Hash,
+       correct bool,
        // These are peers that sent us blocks that differ from what we hash here.
        differingPeers map[bannableAddr]struct{},
        err error,
 ) {
        p := t.piece(piece)
        p.waitNoPendingWrites()
-       storagePiece := t.pieces[piece].Storage()
-
-       // Does the backend want to do its own hashing?
-       if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
-               var sum metainfo.Hash
-               // log.Printf("A piece decided to self-hash: %d", piece)
-               sum, err = i.SelfHash()
-               missinggo.CopyExact(&ret, sum)
-               return
+       storagePiece := p.Storage()
+
+       var h hash.Hash
+       if p.hash != nil {
+               h = pieceHash.New()
+
+               // Does the backend want to do its own hashing?
+               if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
+                       var sum metainfo.Hash
+                       // log.Printf("A piece decided to self-hash: %d", piece)
+                       sum, err = i.SelfHash()
+                       correct = sum == *p.hash
+                       // Can't do smart banning without reading the piece. The smartBanCache is still cleared
+                       // in pieceHasher regardless.
+                       return
+               }
+
+       } else if p.hashV2.Ok {
+               h = merkle.NewHash()
+       } else {
+               panic("no hash")
        }
 
-       hash := pieceHash.New()
        const logPieceContents = false
        smartBanWriter := t.smartBanBlockCheckingWriter(piece)
-       writers := []io.Writer{hash, smartBanWriter}
+       writers := []io.Writer{h, smartBanWriter}
        var examineBuf bytes.Buffer
        if logPieceContents {
                writers = append(writers, &examineBuf)
@@ -1089,7 +1111,23 @@ func (t *Torrent) hashPiece(piece pieceIndex) (
        }
        smartBanWriter.Flush()
        differingPeers = smartBanWriter.badPeers
-       missinggo.CopyExact(&ret, hash.Sum(nil))
+       if p.hash != nil {
+               var sum [20]byte
+               n := len(h.Sum(sum[:0]))
+               if n != 20 {
+                       panic(n)
+               }
+               correct = sum == *p.hash
+       } else if p.hashV2.Ok {
+               var sum [32]byte
+               n := len(h.Sum(sum[:0]))
+               if n != 32 {
+                       panic(n)
+               }
+               correct = sum == p.hashV2.Value
+       } else {
+               panic("no hash")
+       }
        return
 }
 
@@ -2169,10 +2207,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                } else {
                        log.Fmsg(
                                "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
-                       ).AddValues(t, p).LogLevel(
-
-                               log.Debug, t.logger)
-
+                       ).AddValues(t, p).LogLevel(log.Info, t.logger)
                        pieceHashedNotCorrect.Add(1)
                }
        }
@@ -2368,8 +2403,7 @@ func (t *Torrent) dropBannedPeers() {
 
 func (t *Torrent) pieceHasher(index pieceIndex) {
        p := t.piece(index)
-       sum, failedPeers, copyErr := t.hashPiece(index)
-       correct := sum == *p.hash
+       correct, failedPeers, copyErr := t.hashPiece(index)
        switch copyErr {
        case nil, io.EOF:
        default:
@@ -2411,6 +2445,12 @@ func (t *Torrent) peersAsSlice() (ret []*Peer) {
        return
 }
 
+func (t *Torrent) queueInitialPieceCheck(i pieceIndex) {
+       if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk {
+               t.queuePieceCheck(i)
+       }
+}
+
 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
        piece := t.piece(pieceIndex)
        if piece.hash == nil && !piece.hashV2.Ok {