go.mod | 2 +- merkle/hash.go | 4 +--- merkle/merkle.go | 7 +++++++ metainfo/bep52.go | 7 +------ peerconn.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++--- torrent.go | 12 ++++++++++-- v2hashes.go | 4 ++++ diff --git a/go.mod b/go.mod index 637f205a2d18e0e7df82840ccc8c01d8a67d50ef..1af03a6edd73e57be5e46847b734f93afd61927e 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0 go.opentelemetry.io/otel/sdk v1.8.0 go.opentelemetry.io/otel/trace v1.8.0 golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df + golang.org/x/sync v0.3.0 golang.org/x/sys v0.15.0 golang.org/x/time v0.0.0-20220609170525-579cf78fd858 ) @@ -111,7 +112,6 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.8.0 // indirect go.opentelemetry.io/proto/otlp v0.18.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.10.0 // indirect - golang.org/x/sync v0.3.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect diff --git a/merkle/hash.go b/merkle/hash.go index 18ecee78e0e623736f8dceeb95f217836991c7c4..1a8f67a92288a243a772892bf88813620e3bb261 100644 --- a/merkle/hash.go +++ b/merkle/hash.go @@ -50,9 +50,7 @@ blocks := h.blocks if h.written != 0 { blocks = append(blocks, h.nextBlockSum()) } - n := int(RoundUpToPowerOfTwo(uint(len(blocks)))) - blocks = append(blocks, make([][32]byte, n-len(blocks))...) - sum := Root(blocks) + sum := RootWithPadHash(blocks, [32]byte{}) return append(b, sum[:]...) } diff --git a/merkle/merkle.go b/merkle/merkle.go index 171555510dccc5778f3579eca69ce0dbd3fabc61..582c08074b54f37d483371e1f34c6ba35a3f6d01 100644 --- a/merkle/merkle.go +++ b/merkle/merkle.go @@ -32,6 +32,13 @@ } return Root(next) } +func RootWithPadHash(hashes [][sha256.Size]byte, padHash [sha256.Size]byte) [sha256.Size]byte { + for uint(len(hashes)) < RoundUpToPowerOfTwo(uint(len(hashes))) { + hashes = append(hashes, padHash) + } + return Root(hashes) +} + func CompactLayerToSliceHashes(compactLayer string) (hashes [][sha256.Size]byte, err error) { g.MakeSliceWithLength(&hashes, len(compactLayer)/sha256.Size) for i := range hashes { diff --git a/metainfo/bep52.go b/metainfo/bep52.go index 0291d653496c6375dafbec8dc119f054ef21026a..dd3f741432a86526e64431120ab60ccf9e95cea2 100644 --- a/metainfo/bep52.go +++ b/metainfo/bep52.go @@ -36,12 +36,7 @@ return } var layerHashes [][32]byte layerHashes, err = merkle.CompactLayerToSliceHashes(filePieceLayers) - padHash := HashForPiecePad(pieceLength) - for uint(len(layerHashes)) < merkle.RoundUpToPowerOfTwo(uint(len(layerHashes))) { - layerHashes = append(layerHashes, padHash) - } - var root [32]byte - root = merkle.Root(layerHashes) + root := merkle.RootWithPadHash(layerHashes, HashForPiecePad(pieceLength)) if root != piecesRoot.Value { err = fmt.Errorf("file %q: expected hash %x got %x", path, piecesRoot.Value, root) return diff --git a/peerconn.go b/peerconn.go index ea2ee9662d20d603cefaf7ae32f39780877c4956..c55a7d7321e653687356a0a73067814c965f85f1 100644 --- a/peerconn.go +++ b/peerconn.go @@ -81,7 +81,13 @@ peerRequestDataAllocLimiter alloclim.Limiter outstandingHolepunchingRendezvous map[netip.AddrPort]struct{} + // Hash requests sent to the peer. If there's an issue we probably don't want to reissue these, + // because I haven't implemented it smart enough yet. sentHashRequests map[hashRequest]struct{} + // Hash pieces received from the peer, mapped from pieces root to piece layer hashes. This way + // we can verify all the pieces for a file when they're all arrived before submitting them to + // the torrent. + receivedHashPieces map[[32]byte][][32]byte } func (cn *PeerConn) pexStatus() string { @@ -738,7 +744,11 @@ func() { cl.unlock() defer cl.lock() err = decoder.Decode(&msg) + if err != nil { + err = fmt.Errorf("decoding message: %w", err) + } }() + // Do this before checking closed. if cb := c.callbacks.ReadMessage; cb != nil && err == nil { cb(c, &msg) } @@ -746,6 +756,7 @@ if t.closed.IsSet() || c.closed.IsSet() { return nil } if err != nil { + err = log.WithLevel(log.Info, err) return err } c.lastMessageReceived = time.Now() @@ -869,7 +880,9 @@ log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger) c.updateRequests("PeerConn.mainReadLoop allowed fast") case pp.Extended: err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) - case pp.HashRequest, pp.Hashes, pp.HashReject: + case pp.Hashes: + err = c.onReadHashes(&msg) + case pp.HashRequest, pp.HashReject: err = log.WithLevel(log.Warning, fmt.Errorf("received unimplemented BitTorrent v2 message: %v", msg.Type)) default: err = fmt.Errorf("received unknown message type: %#v", msg.Type) @@ -1220,6 +1233,9 @@ return nil } func (pc *PeerConn) requestMissingHashes() { + if pc.peerChoking { + return + } if !pc.t.haveInfo() { return } @@ -1230,15 +1246,32 @@ } baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo( uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)), )) + nextFileBeginPiece := 0 +file: for _, file := range info.UpvertedFiles() { - piecesRoot := file.PiecesRoot.Unwrap() fileNumPieces := int((file.Length + info.PieceLength - 1) / info.PieceLength) - proofLayers := pp.Integer(0) + curFileBeginPiece := nextFileBeginPiece + nextFileBeginPiece += fileNumPieces + haveAllHashes := true + for i := range fileNumPieces { + torrentPieceIndex := curFileBeginPiece + i + if !pc.peerHasPiece(torrentPieceIndex) { + continue file + } + if !pc.t.piece(torrentPieceIndex).hashV2.Ok { + haveAllHashes = false + } + } + if haveAllHashes { + continue + } // We would be requesting the leaves, the file must be short enough that we can just do with // the pieces root as the piece hash. if fileNumPieces <= 1 { continue } + piecesRoot := file.PiecesRoot.Unwrap() + proofLayers := pp.Integer(0) for index := 0; index < fileNumPieces; index += 512 { // Minimizing to the number of pieces in a file conflicts with the BEP. length := merkle.RoundUpToPowerOfTwo(uint(min(512, fileNumPieces-index))) @@ -1247,6 +1280,9 @@ // This should have been filtered out by baseLayer and pieces root as piece hash // checks. panic(length) } + if length%2 != 0 { + pc.logger.Levelf(log.Warning, "requesting odd hashes length %d", length) + } msg := pp.Message{ Type: pp.HashRequest, PiecesRoot: piecesRoot, @@ -1264,6 +1300,41 @@ generics.MakeMapIfNil(&pc.sentHashRequests) pc.sentHashRequests[hr] = struct{}{} } } +} + +func (pc *PeerConn) onReadHashes(msg *pp.Message) (err error) { + file := pc.t.getFileByPiecesRoot(msg.PiecesRoot) + filePieceHashes := pc.receivedHashPieces[msg.PiecesRoot] + if filePieceHashes == nil { + filePieceHashes = make([][32]byte, file.numPieces()) + generics.MakeMapIfNil(&pc.receivedHashPieces) + pc.receivedHashPieces[msg.PiecesRoot] = filePieceHashes + } + if msg.ProofLayers != 0 { + // This isn't handled yet. + panic(msg.ProofLayers) + } + copy(filePieceHashes[msg.Index:], msg.Hashes) + root := merkle.RootWithPadHash( + filePieceHashes, + metainfo.HashForPiecePad(int64(pc.t.usualPieceSize()))) + expectedPiecesRoot := file.piecesRoot.Unwrap() + if root == expectedPiecesRoot { + pc.logger.WithNames(v2HashesLogName).Levelf( + log.Info, + "got piece hashes for file %v (num pieces %v)", + file, file.numPieces()) + for filePieceIndex, peerHash := range filePieceHashes { + torrentPieceIndex := file.BeginPieceIndex() + filePieceIndex + pc.t.piece(torrentPieceIndex).hashV2.Set(peerHash) + } + } else { + pc.logger.WithNames(v2HashesLogName).Levelf( + log.Debug, + "peer file piece hashes root mismatch: %x != %x", + root, expectedPiecesRoot) + } + return nil } type hashRequest struct { diff --git a/torrent.go b/torrent.go index 8a403e16aba5571fab7d2cd3e22259b69da7d3b2..e7b579b3c0e89a38d4bc5fc72cf533b054c56f82 100644 --- a/torrent.go +++ b/torrent.go @@ -2006,8 +2006,7 @@ done = _done go func() { defer stop() defer close(_done) - // Won't this race? - err = eg.Wait() + eg.Wait() }() return } @@ -3118,3 +3117,12 @@ if t.infoHashV2.Ok { each(*t.infoHashV2.Value.ToShort()) } } + +func (t *Torrent) getFileByPiecesRoot(hash [32]byte) *File { + for _, f := range *t.files { + if f.piecesRoot.Unwrap() == hash { + return f + } + } + return nil +} diff --git a/v2hashes.go b/v2hashes.go index 10cbafc73d7b9237c864a6f3c625bf9c74366b21..0525daa066fc78e16ceba2df0f0a1245ed0834bf 100644 --- a/v2hashes.go +++ b/v2hashes.go @@ -1 +1,5 @@ package torrent + +const ( + v2HashesLogName = "v2hashes" +)