]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Redo the compact peer types
authorMatt Joiner <anacrolix@gmail.com>
Mon, 17 Aug 2015 09:52:47 +0000 (19:52 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 17 Aug 2015 09:52:47 +0000 (19:52 +1000)
client.go
dht/announce.go
dht/dht.go
pex.go [new file with mode: 0644]
pex_test.go [new file with mode: 0644]
tracker/http.go
tracker/server.go [new file with mode: 0644]
tracker/udp.go
tracker/udp_test.go
util/types.go

index 4d12f6751181b7bd977e59de4b2926cc7b457340..72d89e48b31b2064f4aca189a98f71bbca884fbb 100644 (file)
--- a/client.go
+++ b/client.go
@@ -35,12 +35,10 @@ import (
        "github.com/anacrolix/torrent/dht"
        "github.com/anacrolix/torrent/internal/pieceordering"
        "github.com/anacrolix/torrent/iplist"
-       "github.com/anacrolix/torrent/logonce"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/tracker"
-       . "github.com/anacrolix/torrent/util"
 )
 
 var (
@@ -1403,12 +1401,6 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connect
        return
 }
 
-type peerExchangeMessage struct {
-       Added      CompactPeers `bencode:"added"`
-       AddedFlags []byte       `bencode:"added.f"`
-       Dropped    CompactPeers `bencode:"dropped"`
-}
-
 // Extracts the port as an integer from an address string.
 func addrPort(addr net.Addr) int {
        return AddrPort(addr)
@@ -2434,8 +2426,8 @@ newAnnounce:
                        for trIndex, tr := range tier {
                                numTrackersTried++
                                err := cl.announceTorrentSingleTracker(tr, &req, t)
-                               if err != nil {
-                                       logonce.Stderr.Printf("%s: error announcing to %s: %s", t, tr, err)
+                               if err != nil && missinggo.CryHeard() {
+                                       log.Printf("%s: error announcing to %s: %s", t, tr, err)
                                        continue
                                }
                                // Float the successful announce to the top of the tier. If
index 3b9dc32f3e0fad94eaedf3d5900f5acf4d6b16d1..84b3c7bc958b052327efb622b4254771e63303ac 100644 (file)
@@ -11,7 +11,6 @@ import (
        "github.com/willf/bloom"
 
        "github.com/anacrolix/torrent/logonce"
-       "github.com/anacrolix/torrent/util"
 )
 
 // Maintains state for an ongoing Announce operation. An Announce is started
@@ -204,8 +203,8 @@ func (me *Announce) getPeers(addr dHTAddr) error {
 // peers that a node has reported as being in the swarm for a queried info
 // hash.
 type PeersValues struct {
-       Peers    []util.CompactPeer // Peers given in get_peers response.
-       NodeInfo                    // The node that gave the response.
+       Peers    []Peer // Peers given in get_peers response.
+       NodeInfo        // The node that gave the response.
 }
 
 // Stop the announce.
index dc3fe0cd1721a130aacc7e796bcec699b5b09e48..0b99d4ce76fb7f9bc0870f4cb0ac08365deb40f5 100644 (file)
@@ -18,6 +18,7 @@ import (
        "math/rand"
        "net"
        "os"
+       "strconv"
        "time"
 
        "github.com/anacrolix/missinggo"
@@ -1028,19 +1029,25 @@ func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err er
        return
 }
 
+type Peer struct {
+       IP   net.IP
+       Port int
+}
+
+func (me *Peer) String() string {
+       return net.JoinHostPort(me.IP.String(), strconv.FormatInt(int64(me.Port), 10))
+}
+
 // In a get_peers response, the addresses of torrent clients involved with the
 // queried info-hash.
-func (m Msg) Values() (vs []util.CompactPeer) {
-       r, ok := m["r"]
-       if !ok {
-               return
-       }
-       rd, ok := r.(map[string]interface{})
-       if !ok {
-               return
-       }
-       v, ok := rd["values"]
-       if !ok {
+func (m Msg) Values() (vs []Peer) {
+       v := func() interface{} {
+               defer func() {
+                       recover()
+               }()
+               return m["r"].(map[string]interface{})["values"]
+       }()
+       if v == nil {
                return
        }
        vl, ok := v.([]interface{})
@@ -1050,19 +1057,21 @@ func (m Msg) Values() (vs []util.CompactPeer) {
                }
                return
        }
-       vs = make([]util.CompactPeer, 0, len(vl))
+       vs = make([]Peer, 0, len(vl))
        for _, i := range vl {
                s, ok := i.(string)
                if !ok {
                        panic(i)
                }
+               // Because it's a list of strings, we can let the length of the string
+               // determine the IP version of the compact peer.
                var cp util.CompactPeer
                err := cp.UnmarshalBinary([]byte(s))
                if err != nil {
                        log.Printf("error decoding values list element: %s", err)
                        continue
                }
-               vs = append(vs, cp)
+               vs = append(vs, Peer{cp.IP[:], int(cp.Port)})
        }
        return
 }
diff --git a/pex.go b/pex.go
new file mode 100644 (file)
index 0000000..674074f
--- /dev/null
+++ b/pex.go
@@ -0,0 +1,9 @@
+package torrent
+
+import "github.com/anacrolix/torrent/util"
+
+type peerExchangeMessage struct {
+       Added      util.CompactIPv4Peers `bencode:"added"`
+       AddedFlags []byte                `bencode:"added.f"`
+       Dropped    util.CompactIPv4Peers `bencode:"dropped"`
+}
diff --git a/pex_test.go b/pex_test.go
new file mode 100644 (file)
index 0000000..f73425f
--- /dev/null
@@ -0,0 +1,18 @@
+package torrent
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/anacrolix/torrent/bencode"
+)
+
+func TestUnmarshalPex(t *testing.T) {
+       var pem peerExchangeMessage
+       err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &pem)
+       require.NoError(t, err)
+       require.EqualValues(t, 2, len(pem.Added))
+       require.EqualValues(t, 1286, pem.Added[0].Port)
+       require.EqualValues(t, 0x100*0xb+0xc, pem.Added[1].Port)
+}
index 7d196482db4785f866976d13d879e80316696c27..15916ee9a50b76821ebe3d152d41598a5ff5a01a 100644 (file)
@@ -43,8 +43,7 @@ func (r *response) UnmarshalPeers() (ret []Peer, err error) {
                err = fmt.Errorf("unsupported peers value type: %T", r.Peers)
                return
        }
-       cp := make(util.CompactPeers, 0, len(s)/6)
-       err = cp.UnmarshalBinary([]byte(s))
+       cp, err := util.UnmarshalIPv4CompactPeers([]byte(s))
        if err != nil {
                return
        }
diff --git a/tracker/server.go b/tracker/server.go
new file mode 100644 (file)
index 0000000..e8613b0
--- /dev/null
@@ -0,0 +1,115 @@
+package tracker
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "math/rand"
+       "net"
+
+       "github.com/anacrolix/torrent/util"
+)
+
+type torrent struct {
+       Leechers int32
+       Seeders  int32
+       Peers    util.CompactIPv4Peers
+}
+
+type server struct {
+       pc    net.PacketConn
+       conns map[int64]struct{}
+       t     map[[20]byte]torrent
+}
+
+func marshal(parts ...interface{}) (ret []byte, err error) {
+       var buf bytes.Buffer
+       for _, p := range parts {
+               err = binary.Write(&buf, binary.BigEndian, p)
+               if err != nil {
+                       return
+               }
+       }
+       ret = buf.Bytes()
+       return
+}
+
+func (me *server) respond(addr net.Addr, rh ResponseHeader, parts ...interface{}) (err error) {
+       b, err := marshal(append([]interface{}{rh}, parts...)...)
+       if err != nil {
+               return
+       }
+       _, err = me.pc.WriteTo(b, addr)
+       return
+}
+
+func (me *server) newConn() (ret int64) {
+       ret = rand.Int63()
+       if me.conns == nil {
+               me.conns = make(map[int64]struct{})
+       }
+       me.conns[ret] = struct{}{}
+       return
+}
+
+func (me *server) serveOne() (err error) {
+       b := make([]byte, 0x10000)
+       n, addr, err := me.pc.ReadFrom(b)
+       if err != nil {
+               return
+       }
+       r := bytes.NewReader(b[:n])
+       var h RequestHeader
+       err = readBody(r, &h)
+       if err != nil {
+               return
+       }
+       switch h.Action {
+       case Connect:
+               if h.ConnectionId != connectRequestConnectionId {
+                       return
+               }
+               connId := me.newConn()
+               err = me.respond(addr, ResponseHeader{
+                       Connect,
+                       h.TransactionId,
+               }, ConnectionResponse{
+                       connId,
+               })
+               return
+       case Announce:
+               if _, ok := me.conns[h.ConnectionId]; !ok {
+                       me.respond(addr, ResponseHeader{
+                               TransactionId: h.TransactionId,
+                               Action:        Error,
+                       }, []byte("not connected"))
+                       return
+               }
+               var ar AnnounceRequest
+               err = readBody(r, &ar)
+               if err != nil {
+                       return
+               }
+               t := me.t[ar.InfoHash]
+               b, err = t.Peers.MarshalBinary()
+               if err != nil {
+                       panic(err)
+               }
+               err = me.respond(addr, ResponseHeader{
+                       TransactionId: h.TransactionId,
+                       Action:        Announce,
+               }, AnnounceResponseHeader{
+                       Interval: 900,
+                       Leechers: t.Leechers,
+                       Seeders:  t.Seeders,
+               }, b)
+               return
+       default:
+               err = fmt.Errorf("unhandled action: %d", h.Action)
+               me.respond(addr, ResponseHeader{
+                       TransactionId: h.TransactionId,
+                       Action:        Error,
+               }, []byte("unhandled action"))
+               return
+       }
+}
index 7021e776d6988978735ee07c62259231a6227fdf..3864bee2b1e3c76e53efa066ea93d224ac934389 100644 (file)
@@ -11,6 +11,8 @@ import (
        "net/url"
        "time"
 
+       "github.com/anacrolix/missinggo"
+
        "github.com/anacrolix/torrent/util"
 )
 
@@ -22,6 +24,8 @@ const (
        Scrape
        Error
 
+       connectRequestConnectionId = 0x41727101980
+
        // BEP 41
        optionTypeEndOfOptions = 0
        optionTypeNOP          = 1
@@ -121,34 +125,29 @@ func (c *udpClient) Announce(req *AnnounceRequest) (res AnnounceResponse, err er
        res.Interval = h.Interval
        res.Leechers = h.Leechers
        res.Seeders = h.Seeders
-       for {
-               var p util.CompactPeer
-               err = binary.Read(b, binary.BigEndian, &p)
-               switch err {
-               case nil:
-               case io.EOF:
-                       err = nil
-                       fallthrough
-               default:
-                       return
-               }
+       cps, err := util.UnmarshalIPv4CompactPeers(b.Bytes())
+       if err != nil {
+               return
+       }
+       for _, cp := range cps {
                res.Peers = append(res.Peers, Peer{
-                       IP:   p.IP[:],
-                       Port: int(p.Port),
+                       IP:   cp.IP[:],
+                       Port: int(cp.Port),
                })
        }
+       return
 }
 
 // body is the binary serializable request body. trailer is optional data
 // following it, such as for BEP 41.
 func (c *udpClient) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
-       buf := &bytes.Buffer{}
-       err = binary.Write(buf, binary.BigEndian, h)
+       var buf bytes.Buffer
+       err = binary.Write(&buf, binary.BigEndian, h)
        if err != nil {
                panic(err)
        }
        if body != nil {
-               err = binary.Write(buf, binary.BigEndian, body)
+               err = binary.Write(&buf, binary.BigEndian, body)
                if err != nil {
                        panic(err)
                }
@@ -177,7 +176,7 @@ func write(w io.Writer, data interface{}) error {
 
 // args is the binary serializable request body. trailer is optional data
 // following it, such as for BEP 41.
-func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Reader, err error) {
+func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
        tid := newTransactionId()
        err = c.write(&RequestHeader{
                ConnectionId:  c.connectionId,
@@ -218,12 +217,12 @@ func (c *udpClient) request(action Action, args interface{}, options []byte) (re
                if h.Action == Error {
                        err = errors.New(buf.String())
                }
-               responseBody = bytes.NewReader(buf.Bytes())
+               responseBody = buf
                return
        }
 }
 
-func readBody(r *bytes.Reader, data ...interface{}) (err error) {
+func readBody(r io.Reader, data ...interface{}) (err error) {
        for _, datum := range data {
                err = binary.Read(r, binary.BigEndian, datum)
                if err != nil {
@@ -241,9 +240,14 @@ func (c *udpClient) Connect() (err error) {
        if c.connected() {
                return nil
        }
-       c.connectionId = 0x41727101980
+       c.connectionId = connectRequestConnectionId
        if c.socket == nil {
-               c.socket, err = net.Dial("udp", c.url.Host)
+               hmp := missinggo.SplitHostPort(c.url.Host)
+               if hmp.NoPort {
+                       hmp.NoPort = false
+                       hmp.Port = 80
+               }
+               c.socket, err = net.Dial("udp", hmp.String())
                if err != nil {
                        return
                }
index fd845f1b5ad2f4fec74373555d194e4235dd5f2c..54a40eb44431384f6310b5ecffd483e46fad4e2b 100644 (file)
@@ -4,14 +4,16 @@ import (
        "bytes"
        "crypto/rand"
        "encoding/binary"
+       "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net"
        "net/url"
+       "strings"
        "sync"
        "testing"
 
+       "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
        "github.com/anacrolix/torrent/util"
@@ -30,18 +32,16 @@ func TestNetIPv4Bytes(t *testing.T) {
 }
 
 func TestMarshalAnnounceResponse(t *testing.T) {
-       w := bytes.Buffer{}
-       peers := util.CompactPeers{{[]byte{127, 0, 0, 1}, 2}, {[]byte{255, 0, 0, 3}, 4}}
-       err := peers.WriteBinary(&w)
-       if err != nil {
-               t.Fatalf("error writing udp announce response addrs: %s", err)
-       }
-       if w.String() != "\x7f\x00\x00\x01\x00\x02\xff\x00\x00\x03\x00\x04" {
-               t.FailNow()
-       }
-       if binary.Size(AnnounceResponseHeader{}) != 12 {
-               t.FailNow()
+       peers := util.CompactIPv4Peers{
+               {[]byte{127, 0, 0, 1}, 2},
+               {[]byte{255, 0, 0, 3}, 4},
        }
+       b, err := peers.MarshalBinary()
+       require.NoError(t, err)
+       require.EqualValues(t,
+               "\x7f\x00\x00\x01\x00\x02\xff\x00\x00\x03\x00\x04",
+               b)
+       require.EqualValues(t, 12, binary.Size(AnnounceResponseHeader{}))
 }
 
 // Failure to write an entire packet to UDP is expected to given an error.
@@ -83,32 +83,80 @@ func TestConvertInt16ToInt(t *testing.T) {
        }
 }
 
+func TestAnnounceLocalhost(t *testing.T) {
+       srv := server{
+               t: map[[20]byte]torrent{
+                       [20]byte{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}: {
+                               Seeders:  1,
+                               Leechers: 2,
+                               Peers: []util.CompactPeer{
+                                       {[]byte{1, 2, 3, 4}, 5},
+                                       {[]byte{6, 7, 8, 9}, 10},
+                               },
+                       },
+               },
+       }
+       var err error
+       srv.pc, err = net.ListenPacket("udp", ":0")
+       require.NoError(t, err)
+       defer srv.pc.Close()
+       tr, err := New(fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()))
+       require.NoError(t, err)
+       go func() {
+               require.NoError(t, srv.serveOne())
+       }()
+       err = tr.Connect()
+       require.NoError(t, err)
+       req := AnnounceRequest{
+               NumWant: -1,
+               Event:   Started,
+       }
+       rand.Read(req.PeerId[:])
+       copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
+       go func() {
+               require.NoError(t, srv.serveOne())
+       }()
+       ar, err := tr.Announce(&req)
+       require.NoError(t, err)
+       assert.EqualValues(t, 1, ar.Seeders)
+       assert.EqualValues(t, 2, len(ar.Peers))
+}
+
 func TestUDPTracker(t *testing.T) {
+       tr, err := New("udp://tracker.openbittorrent.com:80/announce")
+       require.NoError(t, err)
        if testing.Short() {
                t.SkipNow()
        }
-       tr, err := New("udp://tracker.openbittorrent.com:80/announce")
-       if err != nil {
-               t.Skip(err)
-       }
        if err := tr.Connect(); err != nil {
-               t.Skip(err)
+               if strings.Contains(err.Error(), "no such host") {
+                       t.Skip(err)
+               }
+               if strings.Contains(err.Error(), "i/o timeout") {
+                       t.Skip(err)
+               }
+               t.Fatal(err)
        }
        req := AnnounceRequest{
                NumWant: -1,
-               Event:   Started,
+               // Event:   Started,
        }
        rand.Read(req.PeerId[:])
        copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
-       _, err = tr.Announce(&req)
-       if err != nil {
-               t.Skip(err)
+       ar, err := tr.Announce(&req)
+       if ne, ok := err.(net.Error); ok {
+               if ne.Timeout() {
+                       t.Skip(err)
+               }
        }
+       require.NoError(t, err)
+       t.Log(ar)
 }
 
-// TODO: Create a fake UDP tracker to make these requests to.
-func TestAnnounceRandomInfoHash(t *testing.T) {
+func TestAnnounceRandomInfoHashThirdParty(t *testing.T) {
        if testing.Short() {
+               // This test involves contacting third party servers that may have
+               // unpreditable results.
                t.SkipNow()
        }
        req := AnnounceRequest{
@@ -117,13 +165,18 @@ func TestAnnounceRandomInfoHash(t *testing.T) {
        rand.Read(req.PeerId[:])
        rand.Read(req.InfoHash[:])
        wg := sync.WaitGroup{}
+       success := make(chan bool)
+       fail := make(chan struct{})
        for _, url := range []string{
                "udp://tracker.openbittorrent.com:80/announce",
                "udp://tracker.publicbt.com:80",
                "udp://tracker.istole.it:6969",
                "udp://tracker.ccc.de:80",
                "udp://tracker.open.demonii.com:1337",
+               "udp://open.demonii.com:1337",
+               "udp://exodus.desync.com:6969",
        } {
+               wg.Add(1)
                go func(url string) {
                        defer wg.Done()
                        tr, err := New(url)
@@ -140,12 +193,26 @@ func TestAnnounceRandomInfoHash(t *testing.T) {
                                return
                        }
                        if resp.Leechers != 0 || resp.Seeders != 0 || len(resp.Peers) != 0 {
+                               // The info hash we generated was random in 2^160 space. If we
+                               // get a hit, something is weird.
                                t.Fatal(resp)
                        }
+                       t.Logf("announced to %s", url)
+                       // TODO: Can probably get stuck here, but it's just a throwaway
+                       // test.
+                       success <- true
                }(url)
-               wg.Add(1)
        }
-       wg.Wait()
+       go func() {
+               wg.Wait()
+               close(fail)
+       }()
+       // Bail as quickly as we can.
+       select {
+       case <-fail:
+               t.FailNow()
+       case <-success:
+       }
 }
 
 // Check that URLPath option is done correctly.
@@ -164,7 +231,6 @@ func TestURLPathOption(t *testing.T) {
                if err != nil {
                        t.Fatal(err)
                }
-               log.Print("connected")
                _, err = cl.Announce(&AnnounceRequest{})
                if err != nil {
                        t.Fatal(err)
index 68135f8573091d6d874507e55016752eac813f0f..942b9d73d07d750076b9da5654d0ecc47bc66a63 100644 (file)
@@ -3,85 +3,77 @@ package util
 import (
        "encoding"
        "encoding/binary"
-       "fmt"
-       "io"
+       "errors"
        "net"
-       "strconv"
+
+       "github.com/bradfitz/iter"
 
        "github.com/anacrolix/torrent/bencode"
 )
 
-type CompactPeers []CompactPeer
+// Concatenated 6-byte peer addresses.
+type CompactIPv4Peers []CompactPeer
+
+var (
+       // This allows bencode.Unmarshal to do better than a string or []byte.
+       _ bencode.Unmarshaler      = &CompactIPv4Peers{}
+       _ encoding.BinaryMarshaler = CompactIPv4Peers{}
+)
 
-func (me *CompactPeers) UnmarshalBencode(bb []byte) (err error) {
-       var b []byte
-       err = bencode.Unmarshal(bb, &b)
+// This allows bencode.Unmarshal to do better than a string or []byte.
+func (me *CompactIPv4Peers) UnmarshalBencode(b []byte) (err error) {
+       var bb []byte
+       err = bencode.Unmarshal(b, &bb)
        if err != nil {
                return
        }
-       err = me.UnmarshalBinary(b)
+       *me, err = UnmarshalIPv4CompactPeers(bb)
        return
 }
 
-func (me *CompactPeers) UnmarshalBinary(b []byte) (err error) {
-       for i := 0; i < len(b); i += 6 {
-               var p CompactPeer
-               err = p.UnmarshalBinary([]byte(b[i : i+6]))
-               if err != nil {
-                       return
-               }
-               *me = append(*me, p)
-       }
-       return
-}
-
-func (me CompactPeers) WriteBinary(w io.Writer) (err error) {
-       for _, cp := range me {
-               cp.Write(w)
-               if err != nil {
-                       return
-               }
+func (me CompactIPv4Peers) MarshalBinary() (ret []byte, err error) {
+       ret = make([]byte, len(me)*6)
+       for i, cp := range me {
+               copy(ret[6*i:], cp.IP.To4())
+               binary.BigEndian.PutUint16(ret[6*i+4:], uint16(cp.Port))
        }
        return
 }
 
+// Represents peer address in either IPv6 or IPv4 form.
 type CompactPeer struct {
        IP   net.IP
-       Port uint16
+       Port int
 }
 
-var _ encoding.BinaryUnmarshaler = &CompactPeer{}
-
-func (cp *CompactPeer) UnmarshalBinary(b []byte) (err error) {
+func (me *CompactPeer) UnmarshalBinary(b []byte) error {
        switch len(b) {
        case 18:
-               cp.IP = make([]byte, 16)
+               me.IP = make([]byte, 16)
        case 6:
-               cp.IP = make([]byte, 4)
+               me.IP = make([]byte, 4)
        default:
-               err = fmt.Errorf("bad length: %d", len(b))
-               return
-       }
-       if n := copy(cp.IP, b); n != len(cp.IP) {
-               panic(n)
+               return errors.New("bad length")
        }
-       b = b[len(cp.IP):]
-       if len(b) != 2 {
-               panic(len(b))
-       }
-       cp.Port = binary.BigEndian.Uint16(b)
-       return
+       copy(me.IP, b)
+       b = b[len(me.IP):]
+       me.Port = int(binary.BigEndian.Uint16(b))
+       return nil
 }
 
-func (cp *CompactPeer) Write(w io.Writer) (err error) {
-       _, err = w.Write(cp.IP)
-       if err != nil {
+func UnmarshalIPv4CompactPeers(b []byte) (ret []CompactPeer, err error) {
+       if len(b)%6 != 0 {
+               err = errors.New("bad length")
                return
        }
-       err = binary.Write(w, binary.BigEndian, cp.Port)
+       num := len(b) / 6
+       ret = make([]CompactPeer, num)
+       for i := range iter.N(num) {
+               off := i * 6
+               err = ret[i].UnmarshalBinary(b[off : off+6])
+               if err != nil {
+                       return
+               }
+       }
        return
 }
-
-func (cp *CompactPeer) String() string {
-       return net.JoinHostPort(cp.IP.String(), strconv.FormatUint(uint64(cp.Port), 10))
-}