From: Matt Joiner Date: Tue, 27 Feb 2024 12:31:29 +0000 (+1100) Subject: v2 torrent piece hashing X-Git-Tag: v1.56.0~62^2~10 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=fef859aa03e5a23cd39869f8889a1285c5e11495;p=btrtrc.git v2 torrent piece hashing --- diff --git a/go.mod b/go.mod index 7358f281..637f205a 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 github.com/anacrolix/envpprof v1.3.0 github.com/anacrolix/fuse v0.2.0 - github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13 + github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab github.com/anacrolix/go-libutp v1.3.1 github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4 github.com/anacrolix/missinggo v1.3.0 diff --git a/go.sum b/go.sum index d371109d..fb5f86b1 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/anacrolix/envpprof v1.3.0 h1:WJt9bpuT7A/CDCxPOv/eeZqHWlle/Y0keJUvc6tc github.com/anacrolix/envpprof v1.3.0/go.mod h1:7QIG4CaX1uexQ3tqd5+BRa/9e2D02Wcertl6Yh0jCB0= github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do= github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ= -github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13 h1:qwOprPTDMM3BASJRf84mmZnTXRsPGGJ8xoHKQS7m3so= -github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= +github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab h1:MvuAC/UJtcohN6xWc8zYXSZfllh1LVNepQ0R3BCX5I4= +github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= github.com/anacrolix/go-libutp v1.3.1 h1:idJzreNLl+hNjGC3ZnUOjujEaryeOGgkwHLqSGoige0= github.com/anacrolix/go-libutp v1.3.1/go.mod h1:heF41EC8kN0qCLMokLBVkB8NXiLwx3t8R8810MTNI5o= github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= diff --git a/merkle/hash.go b/merkle/hash.go new file mode 100644 index 00000000..18ecee78 --- /dev/null +++ b/merkle/hash.go @@ -0,0 +1,72 @@ +package merkle + +import ( + "crypto/sha256" + "hash" +) + +func NewHash() *Hash { + return &Hash{ + nextBlock: sha256.New(), + } +} + +type Hash struct { + blocks [][32]byte + nextBlock hash.Hash + written int +} + +func (h *Hash) remaining() int { + return BlockSize - h.written +} + +func (h *Hash) Write(p []byte) (n int, err error) { + for len(p) > 0 { + var n1 int + n1, err = h.nextBlock.Write(p[:min(len(p), h.remaining())]) + n += n1 + h.written += n1 + p = p[n1:] + if h.remaining() == 0 { + h.blocks = append(h.blocks, h.nextBlockSum()) + h.nextBlock.Reset() + h.written = 0 + } + if err != nil { + break + } + } + return +} + +func (h *Hash) nextBlockSum() (sum [32]byte) { + h.nextBlock.Sum(sum[:0]) + return +} + +func (h *Hash) Sum(b []byte) []byte { + blocks := h.blocks + if h.written != 0 { + blocks = append(blocks, h.nextBlockSum()) + } + n := int(RoundUpToPowerOfTwo(uint(len(blocks)))) + blocks = append(blocks, make([][32]byte, n-len(blocks))...) + sum := Root(blocks) + return append(b, sum[:]...) +} + +func (h *Hash) Reset() { + h.blocks = h.blocks[:0] + h.nextBlock.Reset() +} + +func (h *Hash) Size() int { + return 32 +} + +func (h *Hash) BlockSize() int { + return h.nextBlock.BlockSize() +} + +var _ hash.Hash = (*Hash)(nil) diff --git a/merkle/merkle.go b/merkle/merkle.go index 76985e8f..a6667cb4 100644 --- a/merkle/merkle.go +++ b/merkle/merkle.go @@ -7,8 +7,14 @@ import ( "math/bits" ) +// The leaf block size for BitTorrent v2 Merkle trees. +const BlockSize = 1 << 14 // 16KiB + func Root(hashes [][sha256.Size]byte) [sha256.Size]byte { - if len(hashes) <= 1 { + switch len(hashes) { + case 0: + return sha256.Sum256(nil) + case 1: return hashes[0] } numHashes := uint(len(hashes)) diff --git a/metainfo/bep52.go b/metainfo/bep52.go index 18be7267..8bdd19de 100644 --- a/metainfo/bep52.go +++ b/metainfo/bep52.go @@ -25,8 +25,9 @@ func ValidatePieceLayers( if !ok { // BEP 52: "For each file in the file tree that is larger than the piece size it // contains one string value.". The reference torrent creator in - // https://blog.libtorrent.org/2020/09/bittorrent-v2/ also has this. I'm not sure what - // harm it causes if it's present anyway, possibly it won't be useful to us. + // https://blog.libtorrent.org/2020/09/bittorrent-v2/ also has this. If a file is equal + // to or smaller than the piece length, we can just use the pieces root instead of the + // piece layer hash. if ft.File.Length > pieceLength { err = fmt.Errorf("no piece layers for file %q", path) } diff --git a/torrent.go b/torrent.go index 8416e4ba..f15459b5 100644 --- a/torrent.go +++ b/torrent.go @@ -10,6 +10,7 @@ import ( "github.com/anacrolix/torrent/merkle" "github.com/anacrolix/torrent/types/infohash" infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" + "hash" "io" "math/rand" "net/netip" @@ -423,22 +424,35 @@ func (t *Torrent) AddPieceLayers(layers map[string]string) (err error) { return } compactLayer, ok := layers[string(f.piecesRoot.Value[:])] - if !ok { - continue - } var hashes [][32]byte - hashes, err = merkle.CompactLayerToSliceHashes(compactLayer) - if err != nil { - err = fmt.Errorf("bad piece layers for file %q: %w", f, err) - return + if ok { + hashes, err = merkle.CompactLayerToSliceHashes(compactLayer) + if err != nil { + err = fmt.Errorf("bad piece layers for file %q: %w", f, err) + return + } + } else if f.length > t.info.PieceLength { + // BEP 52 is pretty strongly worded about this, even though we should be able to + // recover: If a v2 torrent is added by magnet link or infohash, we need to fetch piece + // layers ourselves anyway, and that's how we can recover from this. + t.logger.Levelf(log.Warning, "no piece layers for file %q", f) + continue + } else { + hashes = [][32]byte{f.piecesRoot.Value} } if len(hashes) != f.numPieces() { err = fmt.Errorf("file %q: got %v hashes expected %v", f, len(hashes), f.numPieces()) return } for i := range f.numPieces() { - p := t.piece(f.BeginPieceIndex() + i) - p.hashV2.Set(hashes[i]) + pi := f.BeginPieceIndex() + i + p := t.piece(pi) + // See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we + // didn't yet have a piece hash (can occur with v2 when we don't start with piece + // layers). + if !p.hashV2.Set(hashes[i]).Ok && p.hash == nil { + t.queueInitialPieceCheck(pi) + } } } return nil @@ -521,10 +535,7 @@ func (t *Torrent) onSetInfo() { p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i) t.addRequestOrderPiece(i) t.updatePieceCompletion(i) - if !t.initialPieceCheckDisabled && !p.storageCompletionOk { - // t.logger.Printf("piece %s completion unknown, queueing check", p) - t.queuePieceCheck(i) - } + t.queueInitialPieceCheck(i) } t.cl.event.Broadcast() close(t.gotMetainfoC) @@ -1057,28 +1068,39 @@ func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWr } func (t *Torrent) hashPiece(piece pieceIndex) ( - ret metainfo.Hash, + correct bool, // These are peers that sent us blocks that differ from what we hash here. differingPeers map[bannableAddr]struct{}, err error, ) { p := t.piece(piece) p.waitNoPendingWrites() - storagePiece := t.pieces[piece].Storage() - - // Does the backend want to do its own hashing? - if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok { - var sum metainfo.Hash - // log.Printf("A piece decided to self-hash: %d", piece) - sum, err = i.SelfHash() - missinggo.CopyExact(&ret, sum) - return + storagePiece := p.Storage() + + var h hash.Hash + if p.hash != nil { + h = pieceHash.New() + + // Does the backend want to do its own hashing? + if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok { + var sum metainfo.Hash + // log.Printf("A piece decided to self-hash: %d", piece) + sum, err = i.SelfHash() + correct = sum == *p.hash + // Can't do smart banning without reading the piece. The smartBanCache is still cleared + // in pieceHasher regardless. + return + } + + } else if p.hashV2.Ok { + h = merkle.NewHash() + } else { + panic("no hash") } - hash := pieceHash.New() const logPieceContents = false smartBanWriter := t.smartBanBlockCheckingWriter(piece) - writers := []io.Writer{hash, smartBanWriter} + writers := []io.Writer{h, smartBanWriter} var examineBuf bytes.Buffer if logPieceContents { writers = append(writers, &examineBuf) @@ -1089,7 +1111,23 @@ func (t *Torrent) hashPiece(piece pieceIndex) ( } smartBanWriter.Flush() differingPeers = smartBanWriter.badPeers - missinggo.CopyExact(&ret, hash.Sum(nil)) + if p.hash != nil { + var sum [20]byte + n := len(h.Sum(sum[:0])) + if n != 20 { + panic(n) + } + correct = sum == *p.hash + } else if p.hashV2.Ok { + var sum [32]byte + n := len(h.Sum(sum[:0])) + if n != 32 { + panic(n) + } + correct = sum == p.hashV2.Value + } else { + panic("no hash") + } return } @@ -2169,10 +2207,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } else { log.Fmsg( "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers), - ).AddValues(t, p).LogLevel( - - log.Debug, t.logger) - + ).AddValues(t, p).LogLevel(log.Info, t.logger) pieceHashedNotCorrect.Add(1) } } @@ -2368,8 +2403,7 @@ func (t *Torrent) dropBannedPeers() { func (t *Torrent) pieceHasher(index pieceIndex) { p := t.piece(index) - sum, failedPeers, copyErr := t.hashPiece(index) - correct := sum == *p.hash + correct, failedPeers, copyErr := t.hashPiece(index) switch copyErr { case nil, io.EOF: default: @@ -2411,6 +2445,12 @@ func (t *Torrent) peersAsSlice() (ret []*Peer) { return } +func (t *Torrent) queueInitialPieceCheck(i pieceIndex) { + if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk { + t.queuePieceCheck(i) + } +} + func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) { piece := t.piece(pieceIndex) if piece.hash == nil && !piece.hashV2.Ok {