]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Implement reading piece hashes from peers
authorMatt Joiner <anacrolix@gmail.com>
Fri, 1 Mar 2024 03:42:22 +0000 (14:42 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 2 Mar 2024 02:02:55 +0000 (13:02 +1100)
go.mod
merkle/hash.go
merkle/merkle.go
metainfo/bep52.go
peerconn.go
torrent.go
v2hashes.go

diff --git a/go.mod b/go.mod
index 637f205a2d18e0e7df82840ccc8c01d8a67d50ef..1af03a6edd73e57be5e46847b734f93afd61927e 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -52,6 +52,7 @@ require (
        go.opentelemetry.io/otel/sdk v1.8.0
        go.opentelemetry.io/otel/trace v1.8.0
        golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
+       golang.org/x/sync v0.3.0
        golang.org/x/sys v0.15.0
        golang.org/x/time v0.0.0-20220609170525-579cf78fd858
 )
@@ -111,7 +112,6 @@ require (
        go.opentelemetry.io/proto/otlp v0.18.0 // indirect
        golang.org/x/crypto v0.17.0 // indirect
        golang.org/x/net v0.10.0 // indirect
-       golang.org/x/sync v0.3.0 // indirect
        golang.org/x/text v0.14.0 // indirect
        golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
        google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
index 18ecee78e0e623736f8dceeb95f217836991c7c4..1a8f67a92288a243a772892bf88813620e3bb261 100644 (file)
@@ -50,9 +50,7 @@ func (h *Hash) Sum(b []byte) []byte {
        if h.written != 0 {
                blocks = append(blocks, h.nextBlockSum())
        }
-       n := int(RoundUpToPowerOfTwo(uint(len(blocks))))
-       blocks = append(blocks, make([][32]byte, n-len(blocks))...)
-       sum := Root(blocks)
+       sum := RootWithPadHash(blocks, [32]byte{})
        return append(b, sum[:]...)
 }
 
index 171555510dccc5778f3579eca69ce0dbd3fabc61..582c08074b54f37d483371e1f34c6ba35a3f6d01 100644 (file)
@@ -32,6 +32,13 @@ func Root(hashes [][sha256.Size]byte) [sha256.Size]byte {
        return Root(next)
 }
 
+func RootWithPadHash(hashes [][sha256.Size]byte, padHash [sha256.Size]byte) [sha256.Size]byte {
+       for uint(len(hashes)) < RoundUpToPowerOfTwo(uint(len(hashes))) {
+               hashes = append(hashes, padHash)
+       }
+       return Root(hashes)
+}
+
 func CompactLayerToSliceHashes(compactLayer string) (hashes [][sha256.Size]byte, err error) {
        g.MakeSliceWithLength(&hashes, len(compactLayer)/sha256.Size)
        for i := range hashes {
index 0291d653496c6375dafbec8dc119f054ef21026a..dd3f741432a86526e64431120ab60ccf9e95cea2 100644 (file)
@@ -36,12 +36,7 @@ func ValidatePieceLayers(
                }
                var layerHashes [][32]byte
                layerHashes, err = merkle.CompactLayerToSliceHashes(filePieceLayers)
-               padHash := HashForPiecePad(pieceLength)
-               for uint(len(layerHashes)) < merkle.RoundUpToPowerOfTwo(uint(len(layerHashes))) {
-                       layerHashes = append(layerHashes, padHash)
-               }
-               var root [32]byte
-               root = merkle.Root(layerHashes)
+               root := merkle.RootWithPadHash(layerHashes, HashForPiecePad(pieceLength))
                if root != piecesRoot.Value {
                        err = fmt.Errorf("file %q: expected hash %x got %x", path, piecesRoot.Value, root)
                        return
index ea2ee9662d20d603cefaf7ae32f39780877c4956..c55a7d7321e653687356a0a73067814c965f85f1 100644 (file)
@@ -81,7 +81,13 @@ type PeerConn struct {
 
        outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}
 
+       // Hash requests sent to the peer. If there's an issue we probably don't want to reissue these,
+       // because I haven't implemented it smart enough yet.
        sentHashRequests map[hashRequest]struct{}
+       // Hash pieces received from the peer, mapped from pieces root to piece layer hashes. This way
+       // we can verify all the pieces for a file when they're all arrived before submitting them to
+       // the torrent.
+       receivedHashPieces map[[32]byte][][32]byte
 }
 
 func (cn *PeerConn) pexStatus() string {
@@ -738,7 +744,11 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        cl.unlock()
                        defer cl.lock()
                        err = decoder.Decode(&msg)
+                       if err != nil {
+                               err = fmt.Errorf("decoding message: %w", err)
+                       }
                }()
+               // Do this before checking closed.
                if cb := c.callbacks.ReadMessage; cb != nil && err == nil {
                        cb(c, &msg)
                }
@@ -746,6 +756,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        return nil
                }
                if err != nil {
+                       err = log.WithLevel(log.Info, err)
                        return err
                }
                c.lastMessageReceived = time.Now()
@@ -869,7 +880,9 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        c.updateRequests("PeerConn.mainReadLoop allowed fast")
                case pp.Extended:
                        err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
-               case pp.HashRequest, pp.Hashes, pp.HashReject:
+               case pp.Hashes:
+                       err = c.onReadHashes(&msg)
+               case pp.HashRequest, pp.HashReject:
                        err = log.WithLevel(log.Warning, fmt.Errorf("received unimplemented BitTorrent v2 message: %v", msg.Type))
                default:
                        err = fmt.Errorf("received unknown message type: %#v", msg.Type)
@@ -1220,6 +1233,9 @@ func (pc *PeerConn) WriteExtendedMessage(extName pp.ExtensionName, payload []byt
 }
 
 func (pc *PeerConn) requestMissingHashes() {
+       if pc.peerChoking {
+               return
+       }
        if !pc.t.haveInfo() {
                return
        }
@@ -1230,15 +1246,32 @@ func (pc *PeerConn) requestMissingHashes() {
        baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo(
                uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)),
        ))
+       nextFileBeginPiece := 0
+file:
        for _, file := range info.UpvertedFiles() {
-               piecesRoot := file.PiecesRoot.Unwrap()
                fileNumPieces := int((file.Length + info.PieceLength - 1) / info.PieceLength)
-               proofLayers := pp.Integer(0)
+               curFileBeginPiece := nextFileBeginPiece
+               nextFileBeginPiece += fileNumPieces
+               haveAllHashes := true
+               for i := range fileNumPieces {
+                       torrentPieceIndex := curFileBeginPiece + i
+                       if !pc.peerHasPiece(torrentPieceIndex) {
+                               continue file
+                       }
+                       if !pc.t.piece(torrentPieceIndex).hashV2.Ok {
+                               haveAllHashes = false
+                       }
+               }
+               if haveAllHashes {
+                       continue
+               }
                // We would be requesting the leaves, the file must be short enough that we can just do with
                // the pieces root as the piece hash.
                if fileNumPieces <= 1 {
                        continue
                }
+               piecesRoot := file.PiecesRoot.Unwrap()
+               proofLayers := pp.Integer(0)
                for index := 0; index < fileNumPieces; index += 512 {
                        // Minimizing to the number of pieces in a file conflicts with the BEP.
                        length := merkle.RoundUpToPowerOfTwo(uint(min(512, fileNumPieces-index)))
@@ -1247,6 +1280,9 @@ func (pc *PeerConn) requestMissingHashes() {
                                // checks.
                                panic(length)
                        }
+                       if length%2 != 0 {
+                               pc.logger.Levelf(log.Warning, "requesting odd hashes length %d", length)
+                       }
                        msg := pp.Message{
                                Type:        pp.HashRequest,
                                PiecesRoot:  piecesRoot,
@@ -1266,6 +1302,41 @@ func (pc *PeerConn) requestMissingHashes() {
        }
 }
 
+func (pc *PeerConn) onReadHashes(msg *pp.Message) (err error) {
+       file := pc.t.getFileByPiecesRoot(msg.PiecesRoot)
+       filePieceHashes := pc.receivedHashPieces[msg.PiecesRoot]
+       if filePieceHashes == nil {
+               filePieceHashes = make([][32]byte, file.numPieces())
+               generics.MakeMapIfNil(&pc.receivedHashPieces)
+               pc.receivedHashPieces[msg.PiecesRoot] = filePieceHashes
+       }
+       if msg.ProofLayers != 0 {
+               // This isn't handled yet.
+               panic(msg.ProofLayers)
+       }
+       copy(filePieceHashes[msg.Index:], msg.Hashes)
+       root := merkle.RootWithPadHash(
+               filePieceHashes,
+               metainfo.HashForPiecePad(int64(pc.t.usualPieceSize())))
+       expectedPiecesRoot := file.piecesRoot.Unwrap()
+       if root == expectedPiecesRoot {
+               pc.logger.WithNames(v2HashesLogName).Levelf(
+                       log.Info,
+                       "got piece hashes for file %v (num pieces %v)",
+                       file, file.numPieces())
+               for filePieceIndex, peerHash := range filePieceHashes {
+                       torrentPieceIndex := file.BeginPieceIndex() + filePieceIndex
+                       pc.t.piece(torrentPieceIndex).hashV2.Set(peerHash)
+               }
+       } else {
+               pc.logger.WithNames(v2HashesLogName).Levelf(
+                       log.Debug,
+                       "peer file piece hashes root mismatch: %x != %x",
+                       root, expectedPiecesRoot)
+       }
+       return nil
+}
+
 type hashRequest struct {
        piecesRoot                            [32]byte
        baseLayer, index, length, proofLayers pp.Integer
index 8a403e16aba5571fab7d2cd3e22259b69da7d3b2..e7b579b3c0e89a38d4bc5fc72cf533b054c56f82 100644 (file)
@@ -2006,8 +2006,7 @@ func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(),
        go func() {
                defer stop()
                defer close(_done)
-               // Won't this race?
-               err = eg.Wait()
+               eg.Wait()
        }()
        return
 }
@@ -3118,3 +3117,12 @@ func (t *Torrent) eachShortInfohash(each func(short [20]byte)) {
                each(*t.infoHashV2.Value.ToShort())
        }
 }
+
+func (t *Torrent) getFileByPiecesRoot(hash [32]byte) *File {
+       for _, f := range *t.files {
+               if f.piecesRoot.Unwrap() == hash {
+                       return f
+               }
+       }
+       return nil
+}
index 10cbafc73d7b9237c864a6f3c625bf9c74366b21..0525daa066fc78e16ceba2df0f0a1245ed0834bf 100644 (file)
@@ -1 +1,5 @@
 package torrent
+
+const (
+       v2HashesLogName = "v2hashes"
+)