From 3b290438614b65820280a0854f9de01598ee02e5 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 29 Jun 2014 19:07:43 +1000 Subject: [PATCH] Add peers sent over PEX --- client.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++ client_test.go | 25 ++++++++++++++++++++++ tracker/tracker.go | 15 +++++++++++++ 3 files changed, 93 insertions(+) diff --git a/client.go b/client.go index fe701146..eb352290 100644 --- a/client.go +++ b/client.go @@ -379,6 +379,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) { d := map[string]interface{}{ "m": map[string]int{ "ut_metadata": 1, + "ut_pex": 2, }, "v": "go.torrent dev", } @@ -530,6 +531,31 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connect return } +type peerExchangeMessage struct { + Added compactPeers `bencode:"added"` + AddedFlags []byte `bencode:"added.f"` + Dropped []tracker.Peer `bencode:"dropped"` +} + +type compactPeers []tracker.CompactPeer + +func (me *compactPeers) UnmarshalBencode(bb []byte) (err error) { + var b []byte + err = bencode.Unmarshal(bb, &b) + if err != nil { + return + } + for i := 0; i < len(b); i += 6 { + var p tracker.CompactPeer + err = p.UnmarshalBinary([]byte(b[i : i+6])) + if err != nil { + return + } + *me = append(*me, p) + } + return +} + func (me *Client) connectionLoop(t *torrent, c *connection) error { decoder := pp.Decoder{ R: bufio.NewReader(c.Socket), @@ -673,6 +699,33 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { } case 1: err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c) + case 2: + var pexMsg peerExchangeMessage + err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg) + if err != nil { + err = fmt.Errorf("error unmarshalling PEX message: %s", err) + break + } + go func() { + err := me.AddPeers(t.InfoHash, func() (ret []Peer) { + for _, cp := range pexMsg.Added { + p := Peer{ + IP: make([]byte, 4), + Port: int(cp.Port), + } + if n := copy(p.IP, cp.IP[:]); n != 4 { + panic(n) + } + ret = append(ret, p) + } + return + }()) + if err != nil { + log.Printf("error adding PEX peers: %s", err) + return + } + log.Printf("added %d peers from PEX", len(pexMsg.Added)) + }() default: err = fmt.Errorf("unexpected extended message ID: %s", msg.ExtendedID) } diff --git a/client_test.go b/client_test.go index 04863fb6..2b45cab9 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,7 @@ package torrent import ( + "github.com/anacrolix/libtorgo/bencode" "os" "bitbucket.org/anacrolix/go.torrent/testutil" @@ -50,3 +51,27 @@ func TestTorrentInitialState(t *testing.T) { t.Fatal("pending chunk spec is incorrect") } } + +func TestUnmarshalCompactPeer(t *testing.T) { + var p Peer + err := bencode.Unmarshal([]byte("6:\x01\x02\x03\x04\x05\x06"), &p) + if err != nil { + t.Fatal(err) + } + if p.IP.String() != "1.2.3.4" { + t.FailNow() + } +} + +func TestUnmarshalPEXMsg(t *testing.T) { + var m peerExchangeMessage + if err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &m); err != nil { + t.Fatal(err) + } + if len(m.Added) != 2 { + t.FailNow() + } + if m.Added[0].Port != 0x506 { + t.FailNow() + } +} diff --git a/tracker/tracker.go b/tracker/tracker.go index ffae4341..1914eae1 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -1,6 +1,9 @@ package tracker import ( + "bytes" + "encoding" + "encoding/binary" "errors" "net" "net/url" @@ -33,6 +36,18 @@ type Peer struct { Port int } +type CompactPeer struct { + IP [4]byte + Port uint16 +} + +var _ encoding.BinaryUnmarshaler = &CompactPeer{} + +func (cp *CompactPeer) UnmarshalBinary(b []byte) (err error) { + err = binary.Read(bytes.NewReader(b), binary.BigEndian, cp) + return +} + const ( None AnnounceEvent = iota Completed // The local peer just completed the torrent. -- 2.48.1