From c4e0e154400ab76ca4bde9d0b88d6274f1e9168f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 1 Mar 2024 14:42:22 +1100 Subject: [PATCH] Implement reading piece hashes from peers --- go.mod | 2 +- merkle/hash.go | 4 +-- merkle/merkle.go | 7 +++++ metainfo/bep52.go | 7 +---- peerconn.go | 77 +++++++++++++++++++++++++++++++++++++++++++++-- torrent.go | 12 ++++++-- v2hashes.go | 4 +++ 7 files changed, 98 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 637f205a..1af03a6e 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( go.opentelemetry.io/otel/sdk v1.8.0 go.opentelemetry.io/otel/trace v1.8.0 golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df + golang.org/x/sync v0.3.0 golang.org/x/sys v0.15.0 golang.org/x/time v0.0.0-20220609170525-579cf78fd858 ) @@ -111,7 +112,6 @@ require ( go.opentelemetry.io/proto/otlp v0.18.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.10.0 // indirect - golang.org/x/sync v0.3.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect diff --git a/merkle/hash.go b/merkle/hash.go index 18ecee78..1a8f67a9 100644 --- a/merkle/hash.go +++ b/merkle/hash.go @@ -50,9 +50,7 @@ func (h *Hash) Sum(b []byte) []byte { if h.written != 0 { blocks = append(blocks, h.nextBlockSum()) } - n := int(RoundUpToPowerOfTwo(uint(len(blocks)))) - blocks = append(blocks, make([][32]byte, n-len(blocks))...) - sum := Root(blocks) + sum := RootWithPadHash(blocks, [32]byte{}) return append(b, sum[:]...) } diff --git a/merkle/merkle.go b/merkle/merkle.go index 17155551..582c0807 100644 --- a/merkle/merkle.go +++ b/merkle/merkle.go @@ -32,6 +32,13 @@ func Root(hashes [][sha256.Size]byte) [sha256.Size]byte { return Root(next) } +func RootWithPadHash(hashes [][sha256.Size]byte, padHash [sha256.Size]byte) [sha256.Size]byte { + for uint(len(hashes)) < RoundUpToPowerOfTwo(uint(len(hashes))) { + hashes = append(hashes, padHash) + } + return Root(hashes) +} + func CompactLayerToSliceHashes(compactLayer string) (hashes [][sha256.Size]byte, err error) { g.MakeSliceWithLength(&hashes, len(compactLayer)/sha256.Size) for i := range hashes { diff --git a/metainfo/bep52.go b/metainfo/bep52.go index 0291d653..dd3f7414 100644 --- a/metainfo/bep52.go +++ b/metainfo/bep52.go @@ -36,12 +36,7 @@ func ValidatePieceLayers( } var layerHashes [][32]byte layerHashes, err = merkle.CompactLayerToSliceHashes(filePieceLayers) - padHash := HashForPiecePad(pieceLength) - for uint(len(layerHashes)) < merkle.RoundUpToPowerOfTwo(uint(len(layerHashes))) { - layerHashes = append(layerHashes, padHash) - } - var root [32]byte - root = merkle.Root(layerHashes) + root := merkle.RootWithPadHash(layerHashes, HashForPiecePad(pieceLength)) if root != piecesRoot.Value { err = fmt.Errorf("file %q: expected hash %x got %x", path, piecesRoot.Value, root) return diff --git a/peerconn.go b/peerconn.go index ea2ee966..c55a7d73 100644 --- a/peerconn.go +++ b/peerconn.go @@ -81,7 +81,13 @@ type PeerConn struct { outstandingHolepunchingRendezvous map[netip.AddrPort]struct{} + // Hash requests sent to the peer. If there's an issue we probably don't want to reissue these, + // because I haven't implemented it smart enough yet. sentHashRequests map[hashRequest]struct{} + // Hash pieces received from the peer, mapped from pieces root to piece layer hashes. This way + // we can verify all the pieces for a file when they're all arrived before submitting them to + // the torrent. + receivedHashPieces map[[32]byte][][32]byte } func (cn *PeerConn) pexStatus() string { @@ -738,7 +744,11 @@ func (c *PeerConn) mainReadLoop() (err error) { cl.unlock() defer cl.lock() err = decoder.Decode(&msg) + if err != nil { + err = fmt.Errorf("decoding message: %w", err) + } }() + // Do this before checking closed. if cb := c.callbacks.ReadMessage; cb != nil && err == nil { cb(c, &msg) } @@ -746,6 +756,7 @@ func (c *PeerConn) mainReadLoop() (err error) { return nil } if err != nil { + err = log.WithLevel(log.Info, err) return err } c.lastMessageReceived = time.Now() @@ -869,7 +880,9 @@ func (c *PeerConn) mainReadLoop() (err error) { c.updateRequests("PeerConn.mainReadLoop allowed fast") case pp.Extended: err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) - case pp.HashRequest, pp.Hashes, pp.HashReject: + case pp.Hashes: + err = c.onReadHashes(&msg) + case pp.HashRequest, pp.HashReject: err = log.WithLevel(log.Warning, fmt.Errorf("received unimplemented BitTorrent v2 message: %v", msg.Type)) default: err = fmt.Errorf("received unknown message type: %#v", msg.Type) @@ -1220,6 +1233,9 @@ func (pc *PeerConn) WriteExtendedMessage(extName pp.ExtensionName, payload []byt } func (pc *PeerConn) requestMissingHashes() { + if pc.peerChoking { + return + } if !pc.t.haveInfo() { return } @@ -1230,15 +1246,32 @@ func (pc *PeerConn) requestMissingHashes() { baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo( uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)), )) + nextFileBeginPiece := 0 +file: for _, file := range info.UpvertedFiles() { - piecesRoot := file.PiecesRoot.Unwrap() fileNumPieces := int((file.Length + info.PieceLength - 1) / info.PieceLength) - proofLayers := pp.Integer(0) + curFileBeginPiece := nextFileBeginPiece + nextFileBeginPiece += fileNumPieces + haveAllHashes := true + for i := range fileNumPieces { + torrentPieceIndex := curFileBeginPiece + i + if !pc.peerHasPiece(torrentPieceIndex) { + continue file + } + if !pc.t.piece(torrentPieceIndex).hashV2.Ok { + haveAllHashes = false + } + } + if haveAllHashes { + continue + } // We would be requesting the leaves, the file must be short enough that we can just do with // the pieces root as the piece hash. if fileNumPieces <= 1 { continue } + piecesRoot := file.PiecesRoot.Unwrap() + proofLayers := pp.Integer(0) for index := 0; index < fileNumPieces; index += 512 { // Minimizing to the number of pieces in a file conflicts with the BEP. length := merkle.RoundUpToPowerOfTwo(uint(min(512, fileNumPieces-index))) @@ -1247,6 +1280,9 @@ func (pc *PeerConn) requestMissingHashes() { // checks. panic(length) } + if length%2 != 0 { + pc.logger.Levelf(log.Warning, "requesting odd hashes length %d", length) + } msg := pp.Message{ Type: pp.HashRequest, PiecesRoot: piecesRoot, @@ -1266,6 +1302,41 @@ func (pc *PeerConn) requestMissingHashes() { } } +func (pc *PeerConn) onReadHashes(msg *pp.Message) (err error) { + file := pc.t.getFileByPiecesRoot(msg.PiecesRoot) + filePieceHashes := pc.receivedHashPieces[msg.PiecesRoot] + if filePieceHashes == nil { + filePieceHashes = make([][32]byte, file.numPieces()) + generics.MakeMapIfNil(&pc.receivedHashPieces) + pc.receivedHashPieces[msg.PiecesRoot] = filePieceHashes + } + if msg.ProofLayers != 0 { + // This isn't handled yet. + panic(msg.ProofLayers) + } + copy(filePieceHashes[msg.Index:], msg.Hashes) + root := merkle.RootWithPadHash( + filePieceHashes, + metainfo.HashForPiecePad(int64(pc.t.usualPieceSize()))) + expectedPiecesRoot := file.piecesRoot.Unwrap() + if root == expectedPiecesRoot { + pc.logger.WithNames(v2HashesLogName).Levelf( + log.Info, + "got piece hashes for file %v (num pieces %v)", + file, file.numPieces()) + for filePieceIndex, peerHash := range filePieceHashes { + torrentPieceIndex := file.BeginPieceIndex() + filePieceIndex + pc.t.piece(torrentPieceIndex).hashV2.Set(peerHash) + } + } else { + pc.logger.WithNames(v2HashesLogName).Levelf( + log.Debug, + "peer file piece hashes root mismatch: %x != %x", + root, expectedPiecesRoot) + } + return nil +} + type hashRequest struct { piecesRoot [32]byte baseLayer, index, length, proofLayers pp.Integer diff --git a/torrent.go b/torrent.go index 8a403e16..e7b579b3 100644 --- a/torrent.go +++ b/torrent.go @@ -2006,8 +2006,7 @@ func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), go func() { defer stop() defer close(_done) - // Won't this race? - err = eg.Wait() + eg.Wait() }() return } @@ -3118,3 +3117,12 @@ func (t *Torrent) eachShortInfohash(each func(short [20]byte)) { each(*t.infoHashV2.Value.ToShort()) } } + +func (t *Torrent) getFileByPiecesRoot(hash [32]byte) *File { + for _, f := range *t.files { + if f.piecesRoot.Unwrap() == hash { + return f + } + } + return nil +} diff --git a/v2hashes.go b/v2hashes.go index 10cbafc7..0525daa0 100644 --- a/v2hashes.go +++ b/v2hashes.go @@ -1 +1,5 @@ package torrent + +const ( + v2HashesLogName = "v2hashes" +) -- 2.44.0