"net"
"github.com/anacrolix/dht/krpc"
+
"github.com/anacrolix/torrent/peer_protocol"
)
"fmt"
"io"
"reflect"
+
+ "github.com/anacrolix/missinggo/expect"
)
//----------------------------------------------------------------------------
return buf.Bytes(), nil
}
+func MustMarshal(v interface{}) []byte {
+ b, err := Marshal(v)
+ expect.Nil(err)
+ return b
+}
+
// Unmarshal the bencode value in the 'data' to a value pointed by the 'v'
// pointer, return a non-nil error if any.
func Unmarshal(data []byte, v interface{}) (err error) {
Ok: true,
Set: func() {
// Assigns the value into the map.
- dict.SetMapIndex(reflect.ValueOf(key), value)
+ dict.SetMapIndex(reflect.ValueOf(key).Convert(dict.Type().Key()), value)
},
}
case reflect.Struct:
})
}
return dictField{
- Value: dict.FieldByIndex(sf.Index),
- Ok: true,
- Set: func() {},
+ Value: dict.FieldByIndex(sf.Index),
+ Ok: true,
+ Set: func() {},
IgnoreUnmarshalTypeError: getTag(sf.Tag).IgnoreUnmarshalTypeError(),
}
default:
Type: pp.Extended,
ExtendedID: pp.HandshakeExtendedID,
ExtendedPayload: func() []byte {
- d := map[string]interface{}{
- "m": func() (ret map[string]int) {
- ret = make(map[string]int, 2)
- ret["ut_metadata"] = metadataExtendedId
- if !cl.config.DisablePEX {
- ret["ut_pex"] = pexExtendedId
- }
- return
- }(),
- "v": cl.config.ExtendedHandshakeClientVersion,
- // No upload queue is implemented yet.
- "reqq": 64,
+ msg := pp.ExtendedHandshakeMessage{
+ M: map[pp.ExtensionName]pp.ExtensionNumber{
+ pp.ExtensionNameMetadata: metadataExtendedId,
+ },
+ V: cl.config.ExtendedHandshakeClientVersion,
+ Reqq: 64, // TODO: Really?
+ YourIp: pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())),
+ Encryption: !cl.config.DisableEncryption,
+ Port: cl.incomingPeerPort(),
+ MetadataSize: torrent.metadataSize(),
}
- if !cl.config.DisableEncryption {
- d["e"] = 1
+ // TODO: We can figured these out specific to the socket
+ // used.
+ copy(msg.Ipv4[:], cl.config.PublicIp4.To4())
+ copy(msg.Ipv6[:], cl.config.PublicIp6.To16())
+ if !cl.config.DisablePEX {
+ msg.M[pp.ExtensionNamePex] = pexExtendedId
}
- if torrent.metadataSizeKnown() {
- d["metadata_size"] = torrent.metadataSize()
- }
- if p := cl.incomingPeerPort(); p != 0 {
- d["p"] = p
- }
- yourip, err := addrCompactIP(conn.remoteAddr())
- if err != nil {
- log.Printf("error calculating yourip field value in extension handshake: %s", err)
- } else {
- d["yourip"] = yourip
- }
- // log.Printf("sending %v", d)
- b, err := bencode.Marshal(d)
- if err != nil {
- panic(err)
- }
- return b
+ return bencode.MustMarshal(msg)
}(),
})
}
import (
"bufio"
"bytes"
- "errors"
"fmt"
"io"
"math"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/prioritybitmap"
+ "github.com/pkg/errors"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/mse"
peerAllowedFast bitmap.Bitmap
PeerMaxRequests int // Maximum pending requests the peer allows.
- PeerExtensionIDs map[string]byte
+ PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
PeerClientName string
pieceInclination []int
return cn.conn.LocalAddr()
}
-func (cn *connection) supportsExtension(ext string) bool {
+func (cn *connection) supportsExtension(ext pp.ExtensionName) bool {
_, ok := cn.PeerExtensionIDs[ext]
return ok
}
}
func (cn *connection) requestMetadataPiece(index int) {
- eID := cn.PeerExtensionIDs["ut_metadata"]
+ eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
if eID == 0 {
return
}
if c.t.haveInfo() {
return
}
- if c.PeerExtensionIDs["ut_metadata"] == 0 {
+ if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 {
// Peer doesn't support this.
return
}
}
}
-func (c *connection) onReadExtendedMsg(id byte, payload []byte) (err error) {
+func (c *connection) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
defer func() {
// TODO: Should we still do this?
if err != nil {
cl := t.cl
switch id {
case pp.HandshakeExtendedID:
- // TODO: Create a bencode struct for this.
- var d map[string]interface{}
- err := bencode.Unmarshal(payload, &d)
- if err != nil {
- return fmt.Errorf("error decoding extended message payload: %s", err)
+ var d pp.ExtendedHandshakeMessage
+ if err := bencode.Unmarshal(payload, &d); err != nil {
+ log.Print(err)
+ return errors.Wrap(err, "unmarshalling extended handshake payload")
}
- // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
- if reqq, ok := d["reqq"]; ok {
- if i, ok := reqq.(int64); ok {
- c.PeerMaxRequests = int(i)
- }
+ if d.Reqq != 0 {
+ c.PeerMaxRequests = d.Reqq
}
- if v, ok := d["v"]; ok {
- c.PeerClientName = v.(string)
+ c.PeerClientName = d.V
+ if c.PeerExtensionIDs == nil {
+ c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
}
- if m, ok := d["m"]; ok {
- mTyped, ok := m.(map[string]interface{})
- if !ok {
- return errors.New("handshake m value is not dict")
- }
- if c.PeerExtensionIDs == nil {
- c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
- }
- for name, v := range mTyped {
- id, ok := v.(int64)
- if !ok {
- log.Printf("bad handshake m item extension ID type: %T", v)
- continue
- }
- if id == 0 {
- delete(c.PeerExtensionIDs, name)
- } else {
- if c.PeerExtensionIDs[name] == 0 {
- supportedExtensionMessages.Add(name, 1)
- }
- c.PeerExtensionIDs[name] = byte(id)
- }
+ for name, id := range d.M {
+ if _, ok := c.PeerExtensionIDs[name]; !ok {
+ torrent.Add(fmt.Sprintf("peers supporting extension %q", name), 1)
}
+ c.PeerExtensionIDs[name] = id
}
- metadata_sizeUntyped, ok := d["metadata_size"]
- if ok {
- metadata_size, ok := metadata_sizeUntyped.(int64)
- if !ok {
- log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
- } else {
- err = t.setMetadataSize(metadata_size)
- if err != nil {
- return fmt.Errorf("error setting metadata size to %d", metadata_size)
- }
+ log.Print(c.PeerExtensionIDs)
+ if d.MetadataSize != 0 {
+ if err = t.setMetadataSize(d.MetadataSize); err != nil {
+ return errors.Wrapf(err, "setting metadata size to %d", d.MetadataSize)
}
}
- if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
+ if _, ok := c.PeerExtensionIDs[pp.ExtensionNameMetadata]; ok {
c.requestPendingMetadata()
}
return nil
pieceHash = crypto.SHA1
maxRequests = 250 // Maximum pending requests we allow peers to send us.
defaultChunkSize = 0x4000 // 16KiB
+)
- // These are our extended message IDs. Peers will use these values to
- // select which extension a message is intended for.
+// These are our extended message IDs. Peers will use these values to
+// select which extension a message is intended for.
+const (
metadataExtendedId = iota + 1 // 0 is reserved for deleting keys
pexExtendedId
)
peerExtensions = expvar.NewMap("peerExtensions")
completedHandshakeConnectionFlags = expvar.NewMap("completedHandshakeConnectionFlags")
// Count of connections to peer with same client ID.
- connsToSelf = expvar.NewInt("connsToSelf")
- receivedKeepalives = expvar.NewInt("receivedKeepalives")
- supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
- postedKeepalives = expvar.NewInt("postedKeepalives")
+ connsToSelf = expvar.NewInt("connsToSelf")
+ receivedKeepalives = expvar.NewInt("receivedKeepalives")
+ postedKeepalives = expvar.NewInt("postedKeepalives")
// Requests received for pieces we don't have.
requestsReceivedForMissingPieces = expvar.NewInt("requestsReceivedForMissingPieces")
requestedChunkLengths = expvar.NewMap("requestedChunkLengths")
return l.netGoodPiecesDirtied() < r.netGoodPiecesDirtied()
}
-// Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
-// per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
-func addrCompactIP(addr net.Addr) (string, error) {
- host, _, err := net.SplitHostPort(addr.String())
- if err != nil {
- return "", err
- }
- ip := net.ParseIP(host)
- if v4 := ip.To4(); v4 != nil {
- if len(v4) != 4 {
- panic(v4)
- }
- return string(v4), nil
- }
- return string(ip.To16()), nil
-}
-
func connIsIpv6(nc interface {
LocalAddr() net.Addr
}) bool {
--- /dev/null
+package peer_protocol
+
+import (
+ "net"
+
+ "github.com/anacrolix/torrent/bencode"
+)
+
+// Marshals to the smallest compact byte representation.
+type CompactIp net.IP
+
+var _ bencode.Marshaler = CompactIp{}
+
+func (me CompactIp) MarshalBencode() ([]byte, error) {
+ return bencode.Marshal(func() []byte {
+ if ip4 := net.IP(me).To4(); ip4 != nil {
+ return ip4
+ } else {
+ return me
+ }
+ }())
+}
}
msg.Piece = b
case Extended:
- msg.ExtendedID, err = readByte(r)
+ b, err := readByte(r)
if err != nil {
break
}
+ msg.ExtendedID = ExtensionNumber(b)
msg.ExtendedPayload, err = ioutil.ReadAll(r)
case Port:
err = binary.Read(r, binary.BigEndian, &msg.Port)
--- /dev/null
+package peer_protocol
+
+// http://www.bittorrent.org/beps/bep_0010.html
+type (
+ ExtendedHandshakeMessage struct {
+ M map[ExtensionName]ExtensionNumber `bencode:"m"`
+ V string `bencode:"v,omitempty"`
+ Reqq int `bencode:"reqq,omitempty"`
+ Encryption bool `bencode:"e,omitempty"`
+ // BEP 9
+ MetadataSize int `bencode:"metadata_size,omitempty"`
+ // The local client port. It would be redundant for the receiving side of
+ // a connection to send this.
+ Port int `bencode:"p,omitempty"`
+ YourIp CompactIp `bencode:"yourip,omitempty"`
+ Ipv4 [4]byte `bencode:"ipv4,omitempty"`
+ Ipv6 [16]byte `bencode:"ipv6,omitempty"`
+ }
+
+ ExtensionName string
+ ExtensionNumber int
+)
+
+const (
+ // http://www.bittorrent.org/beps/bep_0011.html
+ ExtensionNamePex ExtensionName = "ut_pex"
+ // http://bittorrent.org/beps/bep_0009.html. Note that there's an
+ // LT_metadata, but I've never implemented it.
+ ExtensionNameMetadata = "ut_metadata"
+)
Index, Begin, Length Integer
Piece []byte
Bitfield []bool
- ExtendedID byte
+ ExtendedID ExtensionNumber
ExtendedPayload []byte
Port uint16
}
panic(n)
}
case Extended:
- err = buf.WriteByte(msg.ExtendedID)
+ err = buf.WriteByte(byte(msg.ExtendedID))
if err != nil {
return
}
}
// TODO: Propagate errors to disconnect peer.
-func (t *Torrent) setMetadataSize(bytes int64) (err error) {
+func (t *Torrent) setMetadataSize(bytes int) (err error) {
if t.haveInfo() {
// We already know the correct metadata size.
return
if data != nil {
d["total_size"] = len(t.metadataBytes)
}
- p, err := bencode.Marshal(d)
- if err != nil {
- panic(err)
- }
+ p := bencode.MustMarshal(d)
return pp.Message{
Type: pp.Extended,
- ExtendedID: c.PeerExtensionIDs["ut_metadata"],
+ ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
ExtendedPayload: append(p, data...),
}
}