From 1f6ba948824afdda3bbdf12224c56dd78da482b5 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 10 Jul 2018 12:20:36 +1000 Subject: [PATCH] Make extended handshake a struct, and move a bunch of extended stuff into peer_protocol --- Peer.go | 1 + bencode/api.go | 8 ++++ bencode/decode.go | 8 ++-- client.go | 49 +++++++++---------------- connection.go | 75 ++++++++++++-------------------------- global.go | 13 ++++--- misc.go | 17 --------- peer_protocol/compactip.go | 22 +++++++++++ peer_protocol/decoder.go | 3 +- peer_protocol/extended.go | 30 +++++++++++++++ peer_protocol/msg.go | 4 +- torrent.go | 9 ++--- 12 files changed, 120 insertions(+), 119 deletions(-) create mode 100644 peer_protocol/compactip.go create mode 100644 peer_protocol/extended.go diff --git a/Peer.go b/Peer.go index e4a7a2fb..a51d86c0 100644 --- a/Peer.go +++ b/Peer.go @@ -4,6 +4,7 @@ import ( "net" "github.com/anacrolix/dht/krpc" + "github.com/anacrolix/torrent/peer_protocol" ) diff --git a/bencode/api.go b/bencode/api.go index 12511997..6c311e11 100644 --- a/bencode/api.go +++ b/bencode/api.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "reflect" + + "github.com/anacrolix/missinggo/expect" ) //---------------------------------------------------------------------------- @@ -121,6 +123,12 @@ func Marshal(v interface{}) ([]byte, error) { return buf.Bytes(), nil } +func MustMarshal(v interface{}) []byte { + b, err := Marshal(v) + expect.Nil(err) + return b +} + // Unmarshal the bencode value in the 'data' to a value pointed by the 'v' // pointer, return a non-nil error if any. func Unmarshal(data []byte, v interface{}) (err error) { diff --git a/bencode/decode.go b/bencode/decode.go index 657cebb0..136c9282 100644 --- a/bencode/decode.go +++ b/bencode/decode.go @@ -211,7 +211,7 @@ func getDictField(dict reflect.Value, key string) dictField { Ok: true, Set: func() { // Assigns the value into the map. - dict.SetMapIndex(reflect.ValueOf(key), value) + dict.SetMapIndex(reflect.ValueOf(key).Convert(dict.Type().Key()), value) }, } case reflect.Struct: @@ -227,9 +227,9 @@ func getDictField(dict reflect.Value, key string) dictField { }) } return dictField{ - Value: dict.FieldByIndex(sf.Index), - Ok: true, - Set: func() {}, + Value: dict.FieldByIndex(sf.Index), + Ok: true, + Set: func() {}, IgnoreUnmarshalTypeError: getTag(sf.Tag).IgnoreUnmarshalTypeError(), } default: diff --git a/client.go b/client.go index 1f6119f5..3c8cc28a 100644 --- a/client.go +++ b/client.go @@ -831,40 +831,25 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) { Type: pp.Extended, ExtendedID: pp.HandshakeExtendedID, ExtendedPayload: func() []byte { - d := map[string]interface{}{ - "m": func() (ret map[string]int) { - ret = make(map[string]int, 2) - ret["ut_metadata"] = metadataExtendedId - if !cl.config.DisablePEX { - ret["ut_pex"] = pexExtendedId - } - return - }(), - "v": cl.config.ExtendedHandshakeClientVersion, - // No upload queue is implemented yet. - "reqq": 64, + msg := pp.ExtendedHandshakeMessage{ + M: map[pp.ExtensionName]pp.ExtensionNumber{ + pp.ExtensionNameMetadata: metadataExtendedId, + }, + V: cl.config.ExtendedHandshakeClientVersion, + Reqq: 64, // TODO: Really? + YourIp: pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())), + Encryption: !cl.config.DisableEncryption, + Port: cl.incomingPeerPort(), + MetadataSize: torrent.metadataSize(), } - if !cl.config.DisableEncryption { - d["e"] = 1 + // TODO: We can figured these out specific to the socket + // used. + copy(msg.Ipv4[:], cl.config.PublicIp4.To4()) + copy(msg.Ipv6[:], cl.config.PublicIp6.To16()) + if !cl.config.DisablePEX { + msg.M[pp.ExtensionNamePex] = pexExtendedId } - if torrent.metadataSizeKnown() { - d["metadata_size"] = torrent.metadataSize() - } - if p := cl.incomingPeerPort(); p != 0 { - d["p"] = p - } - yourip, err := addrCompactIP(conn.remoteAddr()) - if err != nil { - log.Printf("error calculating yourip field value in extension handshake: %s", err) - } else { - d["yourip"] = yourip - } - // log.Printf("sending %v", d) - b, err := bencode.Marshal(d) - if err != nil { - panic(err) - } - return b + return bencode.MustMarshal(msg) }(), }) } diff --git a/connection.go b/connection.go index 5f550ae2..03d4455f 100644 --- a/connection.go +++ b/connection.go @@ -3,7 +3,6 @@ package torrent import ( "bufio" "bytes" - "errors" "fmt" "io" "math" @@ -20,6 +19,7 @@ import ( "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/prioritybitmap" + "github.com/pkg/errors" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/mse" @@ -104,7 +104,7 @@ type connection struct { peerAllowedFast bitmap.Bitmap PeerMaxRequests int // Maximum pending requests the peer allows. - PeerExtensionIDs map[string]byte + PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber PeerClientName string pieceInclination []int @@ -188,7 +188,7 @@ func (cn *connection) localAddr() net.Addr { return cn.conn.LocalAddr() } -func (cn *connection) supportsExtension(ext string) bool { +func (cn *connection) supportsExtension(ext pp.ExtensionName) bool { _, ok := cn.PeerExtensionIDs[ext] return ok } @@ -343,7 +343,7 @@ func (cn *connection) Post(msg pp.Message) { } func (cn *connection) requestMetadataPiece(index int) { - eID := cn.PeerExtensionIDs["ut_metadata"] + eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata] if eID == 0 { return } @@ -919,7 +919,7 @@ func (c *connection) requestPendingMetadata() { if c.t.haveInfo() { return } - if c.PeerExtensionIDs["ut_metadata"] == 0 { + if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 { // Peer doesn't support this. return } @@ -1166,7 +1166,7 @@ func (c *connection) mainReadLoop() (err error) { } } -func (c *connection) onReadExtendedMsg(id byte, payload []byte) (err error) { +func (c *connection) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) { defer func() { // TODO: Should we still do this? if err != nil { @@ -1181,58 +1181,31 @@ func (c *connection) onReadExtendedMsg(id byte, payload []byte) (err error) { cl := t.cl switch id { case pp.HandshakeExtendedID: - // TODO: Create a bencode struct for this. - var d map[string]interface{} - err := bencode.Unmarshal(payload, &d) - if err != nil { - return fmt.Errorf("error decoding extended message payload: %s", err) + var d pp.ExtendedHandshakeMessage + if err := bencode.Unmarshal(payload, &d); err != nil { + log.Print(err) + return errors.Wrap(err, "unmarshalling extended handshake payload") } - // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d) - if reqq, ok := d["reqq"]; ok { - if i, ok := reqq.(int64); ok { - c.PeerMaxRequests = int(i) - } + if d.Reqq != 0 { + c.PeerMaxRequests = d.Reqq } - if v, ok := d["v"]; ok { - c.PeerClientName = v.(string) + c.PeerClientName = d.V + if c.PeerExtensionIDs == nil { + c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M)) } - if m, ok := d["m"]; ok { - mTyped, ok := m.(map[string]interface{}) - if !ok { - return errors.New("handshake m value is not dict") - } - if c.PeerExtensionIDs == nil { - c.PeerExtensionIDs = make(map[string]byte, len(mTyped)) - } - for name, v := range mTyped { - id, ok := v.(int64) - if !ok { - log.Printf("bad handshake m item extension ID type: %T", v) - continue - } - if id == 0 { - delete(c.PeerExtensionIDs, name) - } else { - if c.PeerExtensionIDs[name] == 0 { - supportedExtensionMessages.Add(name, 1) - } - c.PeerExtensionIDs[name] = byte(id) - } + for name, id := range d.M { + if _, ok := c.PeerExtensionIDs[name]; !ok { + torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1) } + c.PeerExtensionIDs[name] = id } - metadata_sizeUntyped, ok := d["metadata_size"] - if ok { - metadata_size, ok := metadata_sizeUntyped.(int64) - if !ok { - log.Printf("bad metadata_size type: %T", metadata_sizeUntyped) - } else { - err = t.setMetadataSize(metadata_size) - if err != nil { - return fmt.Errorf("error setting metadata size to %d", metadata_size) - } + log.Print(c.PeerExtensionIDs) + if d.MetadataSize != 0 { + if err = t.setMetadataSize(d.MetadataSize); err != nil { + return errors.Wrapf(err, "setting metadata size to %d", d.MetadataSize) } } - if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok { + if _, ok := c.PeerExtensionIDs[pp.ExtensionNameMetadata]; ok { c.requestPendingMetadata() } return nil diff --git a/global.go b/global.go index 99e3df2e..7418cd90 100644 --- a/global.go +++ b/global.go @@ -11,9 +11,11 @@ const ( pieceHash = crypto.SHA1 maxRequests = 250 // Maximum pending requests we allow peers to send us. defaultChunkSize = 0x4000 // 16KiB +) - // These are our extended message IDs. Peers will use these values to - // select which extension a message is intended for. +// These are our extended message IDs. Peers will use these values to +// select which extension a message is intended for. +const ( metadataExtendedId = iota + 1 // 0 is reserved for deleting keys pexExtendedId ) @@ -35,10 +37,9 @@ var ( peerExtensions = expvar.NewMap("peerExtensions") completedHandshakeConnectionFlags = expvar.NewMap("completedHandshakeConnectionFlags") // Count of connections to peer with same client ID. - connsToSelf = expvar.NewInt("connsToSelf") - receivedKeepalives = expvar.NewInt("receivedKeepalives") - supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages") - postedKeepalives = expvar.NewInt("postedKeepalives") + connsToSelf = expvar.NewInt("connsToSelf") + receivedKeepalives = expvar.NewInt("receivedKeepalives") + postedKeepalives = expvar.NewInt("postedKeepalives") // Requests received for pieces we don't have. requestsReceivedForMissingPieces = expvar.NewInt("requestsReceivedForMissingPieces") requestedChunkLengths = expvar.NewMap("requestedChunkLengths") diff --git a/misc.go b/misc.go index d1b92c4f..915ed34d 100644 --- a/misc.go +++ b/misc.go @@ -110,23 +110,6 @@ func connLessTrusted(l, r *connection) bool { return l.netGoodPiecesDirtied() < r.netGoodPiecesDirtied() } -// Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes -// per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html. -func addrCompactIP(addr net.Addr) (string, error) { - host, _, err := net.SplitHostPort(addr.String()) - if err != nil { - return "", err - } - ip := net.ParseIP(host) - if v4 := ip.To4(); v4 != nil { - if len(v4) != 4 { - panic(v4) - } - return string(v4), nil - } - return string(ip.To16()), nil -} - func connIsIpv6(nc interface { LocalAddr() net.Addr }) bool { diff --git a/peer_protocol/compactip.go b/peer_protocol/compactip.go new file mode 100644 index 00000000..7dddc53d --- /dev/null +++ b/peer_protocol/compactip.go @@ -0,0 +1,22 @@ +package peer_protocol + +import ( + "net" + + "github.com/anacrolix/torrent/bencode" +) + +// Marshals to the smallest compact byte representation. +type CompactIp net.IP + +var _ bencode.Marshaler = CompactIp{} + +func (me CompactIp) MarshalBencode() ([]byte, error) { + return bencode.Marshal(func() []byte { + if ip4 := net.IP(me).To4(); ip4 != nil { + return ip4 + } else { + return me + } + }()) +} diff --git a/peer_protocol/decoder.go b/peer_protocol/decoder.go index e4aeab08..f07ed14b 100644 --- a/peer_protocol/decoder.go +++ b/peer_protocol/decoder.go @@ -88,10 +88,11 @@ func (d *Decoder) Decode(msg *Message) (err error) { } msg.Piece = b case Extended: - msg.ExtendedID, err = readByte(r) + b, err := readByte(r) if err != nil { break } + msg.ExtendedID = ExtensionNumber(b) msg.ExtendedPayload, err = ioutil.ReadAll(r) case Port: err = binary.Read(r, binary.BigEndian, &msg.Port) diff --git a/peer_protocol/extended.go b/peer_protocol/extended.go new file mode 100644 index 00000000..a02a96f6 --- /dev/null +++ b/peer_protocol/extended.go @@ -0,0 +1,30 @@ +package peer_protocol + +// http://www.bittorrent.org/beps/bep_0010.html +type ( + ExtendedHandshakeMessage struct { + M map[ExtensionName]ExtensionNumber `bencode:"m"` + V string `bencode:"v,omitempty"` + Reqq int `bencode:"reqq,omitempty"` + Encryption bool `bencode:"e,omitempty"` + // BEP 9 + MetadataSize int `bencode:"metadata_size,omitempty"` + // The local client port. It would be redundant for the receiving side of + // a connection to send this. + Port int `bencode:"p,omitempty"` + YourIp CompactIp `bencode:"yourip,omitempty"` + Ipv4 [4]byte `bencode:"ipv4,omitempty"` + Ipv6 [16]byte `bencode:"ipv6,omitempty"` + } + + ExtensionName string + ExtensionNumber int +) + +const ( + // http://www.bittorrent.org/beps/bep_0011.html + ExtensionNamePex ExtensionName = "ut_pex" + // http://bittorrent.org/beps/bep_0009.html. Note that there's an + // LT_metadata, but I've never implemented it. + ExtensionNameMetadata = "ut_metadata" +) diff --git a/peer_protocol/msg.go b/peer_protocol/msg.go index d5c316c9..476a54c0 100644 --- a/peer_protocol/msg.go +++ b/peer_protocol/msg.go @@ -12,7 +12,7 @@ type Message struct { Index, Begin, Length Integer Piece []byte Bitfield []bool - ExtendedID byte + ExtendedID ExtensionNumber ExtendedPayload []byte Port uint16 } @@ -69,7 +69,7 @@ func (msg Message) MarshalBinary() (data []byte, err error) { panic(n) } case Extended: - err = buf.WriteByte(msg.ExtendedID) + err = buf.WriteByte(byte(msg.ExtendedID)) if err != nil { return } diff --git a/torrent.go b/torrent.go index d7746586..29363058 100644 --- a/torrent.go +++ b/torrent.go @@ -444,7 +444,7 @@ func (t *Torrent) haveAllMetadataPieces() bool { } // TODO: Propagate errors to disconnect peer. -func (t *Torrent) setMetadataSize(bytes int64) (err error) { +func (t *Torrent) setMetadataSize(bytes int) (err error) { if t.haveInfo() { // We already know the correct metadata size. return @@ -498,13 +498,10 @@ func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece if data != nil { d["total_size"] = len(t.metadataBytes) } - p, err := bencode.Marshal(d) - if err != nil { - panic(err) - } + p := bencode.MustMarshal(d) return pp.Message{ Type: pp.Extended, - ExtendedID: c.PeerExtensionIDs["ut_metadata"], + ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata], ExtendedPayload: append(p, data...), } } -- 2.48.1