]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Implement reading piece hashes from peers
[btrtrc.git] / peerconn.go
index ea2ee9662d20d603cefaf7ae32f39780877c4956..c55a7d7321e653687356a0a73067814c965f85f1 100644 (file)
@@ -81,7 +81,13 @@ type PeerConn struct {
 
        outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}
 
+       // Hash requests sent to the peer. If there's an issue we probably don't want to reissue these,
+       // because I haven't implemented it smart enough yet.
        sentHashRequests map[hashRequest]struct{}
+       // Hash pieces received from the peer, mapped from pieces root to piece layer hashes. This way
+       // we can verify all the pieces for a file when they're all arrived before submitting them to
+       // the torrent.
+       receivedHashPieces map[[32]byte][][32]byte
 }
 
 func (cn *PeerConn) pexStatus() string {
@@ -738,7 +744,11 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        cl.unlock()
                        defer cl.lock()
                        err = decoder.Decode(&msg)
+                       if err != nil {
+                               err = fmt.Errorf("decoding message: %w", err)
+                       }
                }()
+               // Do this before checking closed.
                if cb := c.callbacks.ReadMessage; cb != nil && err == nil {
                        cb(c, &msg)
                }
@@ -746,6 +756,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        return nil
                }
                if err != nil {
+                       err = log.WithLevel(log.Info, err)
                        return err
                }
                c.lastMessageReceived = time.Now()
@@ -869,7 +880,9 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        c.updateRequests("PeerConn.mainReadLoop allowed fast")
                case pp.Extended:
                        err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
-               case pp.HashRequest, pp.Hashes, pp.HashReject:
+               case pp.Hashes:
+                       err = c.onReadHashes(&msg)
+               case pp.HashRequest, pp.HashReject:
                        err = log.WithLevel(log.Warning, fmt.Errorf("received unimplemented BitTorrent v2 message: %v", msg.Type))
                default:
                        err = fmt.Errorf("received unknown message type: %#v", msg.Type)
@@ -1220,6 +1233,9 @@ func (pc *PeerConn) WriteExtendedMessage(extName pp.ExtensionName, payload []byt
 }
 
 func (pc *PeerConn) requestMissingHashes() {
+       if pc.peerChoking {
+               return
+       }
        if !pc.t.haveInfo() {
                return
        }
@@ -1230,15 +1246,32 @@ func (pc *PeerConn) requestMissingHashes() {
        baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo(
                uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)),
        ))
+       nextFileBeginPiece := 0
+file:
        for _, file := range info.UpvertedFiles() {
-               piecesRoot := file.PiecesRoot.Unwrap()
                fileNumPieces := int((file.Length + info.PieceLength - 1) / info.PieceLength)
-               proofLayers := pp.Integer(0)
+               curFileBeginPiece := nextFileBeginPiece
+               nextFileBeginPiece += fileNumPieces
+               haveAllHashes := true
+               for i := range fileNumPieces {
+                       torrentPieceIndex := curFileBeginPiece + i
+                       if !pc.peerHasPiece(torrentPieceIndex) {
+                               continue file
+                       }
+                       if !pc.t.piece(torrentPieceIndex).hashV2.Ok {
+                               haveAllHashes = false
+                       }
+               }
+               if haveAllHashes {
+                       continue
+               }
                // We would be requesting the leaves, the file must be short enough that we can just do with
                // the pieces root as the piece hash.
                if fileNumPieces <= 1 {
                        continue
                }
+               piecesRoot := file.PiecesRoot.Unwrap()
+               proofLayers := pp.Integer(0)
                for index := 0; index < fileNumPieces; index += 512 {
                        // Minimizing to the number of pieces in a file conflicts with the BEP.
                        length := merkle.RoundUpToPowerOfTwo(uint(min(512, fileNumPieces-index)))
@@ -1247,6 +1280,9 @@ func (pc *PeerConn) requestMissingHashes() {
                                // checks.
                                panic(length)
                        }
+                       if length%2 != 0 {
+                               pc.logger.Levelf(log.Warning, "requesting odd hashes length %d", length)
+                       }
                        msg := pp.Message{
                                Type:        pp.HashRequest,
                                PiecesRoot:  piecesRoot,
@@ -1266,6 +1302,41 @@ func (pc *PeerConn) requestMissingHashes() {
        }
 }
 
+func (pc *PeerConn) onReadHashes(msg *pp.Message) (err error) {
+       file := pc.t.getFileByPiecesRoot(msg.PiecesRoot)
+       filePieceHashes := pc.receivedHashPieces[msg.PiecesRoot]
+       if filePieceHashes == nil {
+               filePieceHashes = make([][32]byte, file.numPieces())
+               generics.MakeMapIfNil(&pc.receivedHashPieces)
+               pc.receivedHashPieces[msg.PiecesRoot] = filePieceHashes
+       }
+       if msg.ProofLayers != 0 {
+               // This isn't handled yet.
+               panic(msg.ProofLayers)
+       }
+       copy(filePieceHashes[msg.Index:], msg.Hashes)
+       root := merkle.RootWithPadHash(
+               filePieceHashes,
+               metainfo.HashForPiecePad(int64(pc.t.usualPieceSize())))
+       expectedPiecesRoot := file.piecesRoot.Unwrap()
+       if root == expectedPiecesRoot {
+               pc.logger.WithNames(v2HashesLogName).Levelf(
+                       log.Info,
+                       "got piece hashes for file %v (num pieces %v)",
+                       file, file.numPieces())
+               for filePieceIndex, peerHash := range filePieceHashes {
+                       torrentPieceIndex := file.BeginPieceIndex() + filePieceIndex
+                       pc.t.piece(torrentPieceIndex).hashV2.Set(peerHash)
+               }
+       } else {
+               pc.logger.WithNames(v2HashesLogName).Levelf(
+                       log.Debug,
+                       "peer file piece hashes root mismatch: %x != %x",
+                       root, expectedPiecesRoot)
+       }
+       return nil
+}
+
 type hashRequest struct {
        piecesRoot                            [32]byte
        baseLayer, index, length, proofLayers pp.Integer