From: Matt Joiner Date: Mon, 17 Aug 2015 09:52:47 +0000 (+1000) Subject: Redo the compact peer types X-Git-Tag: v1.0.0~1066 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=1db00a54f000b9527ad3b97681b3c67764944156;p=btrtrc.git Redo the compact peer types --- diff --git a/client.go b/client.go index 4d12f675..72d89e48 100644 --- a/client.go +++ b/client.go @@ -35,12 +35,10 @@ import ( "github.com/anacrolix/torrent/dht" "github.com/anacrolix/torrent/internal/pieceordering" "github.com/anacrolix/torrent/iplist" - "github.com/anacrolix/torrent/logonce" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/tracker" - . "github.com/anacrolix/torrent/util" ) var ( @@ -1403,12 +1401,6 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connect return } -type peerExchangeMessage struct { - Added CompactPeers `bencode:"added"` - AddedFlags []byte `bencode:"added.f"` - Dropped CompactPeers `bencode:"dropped"` -} - // Extracts the port as an integer from an address string. func addrPort(addr net.Addr) int { return AddrPort(addr) @@ -2434,8 +2426,8 @@ newAnnounce: for trIndex, tr := range tier { numTrackersTried++ err := cl.announceTorrentSingleTracker(tr, &req, t) - if err != nil { - logonce.Stderr.Printf("%s: error announcing to %s: %s", t, tr, err) + if err != nil && missinggo.CryHeard() { + log.Printf("%s: error announcing to %s: %s", t, tr, err) continue } // Float the successful announce to the top of the tier. If diff --git a/dht/announce.go b/dht/announce.go index 3b9dc32f..84b3c7bc 100644 --- a/dht/announce.go +++ b/dht/announce.go @@ -11,7 +11,6 @@ import ( "github.com/willf/bloom" "github.com/anacrolix/torrent/logonce" - "github.com/anacrolix/torrent/util" ) // Maintains state for an ongoing Announce operation. An Announce is started @@ -204,8 +203,8 @@ func (me *Announce) getPeers(addr dHTAddr) error { // peers that a node has reported as being in the swarm for a queried info // hash. type PeersValues struct { - Peers []util.CompactPeer // Peers given in get_peers response. - NodeInfo // The node that gave the response. + Peers []Peer // Peers given in get_peers response. + NodeInfo // The node that gave the response. } // Stop the announce. diff --git a/dht/dht.go b/dht/dht.go index dc3fe0cd..0b99d4ce 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -18,6 +18,7 @@ import ( "math/rand" "net" "os" + "strconv" "time" "github.com/anacrolix/missinggo" @@ -1028,19 +1029,25 @@ func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err er return } +type Peer struct { + IP net.IP + Port int +} + +func (me *Peer) String() string { + return net.JoinHostPort(me.IP.String(), strconv.FormatInt(int64(me.Port), 10)) +} + // In a get_peers response, the addresses of torrent clients involved with the // queried info-hash. -func (m Msg) Values() (vs []util.CompactPeer) { - r, ok := m["r"] - if !ok { - return - } - rd, ok := r.(map[string]interface{}) - if !ok { - return - } - v, ok := rd["values"] - if !ok { +func (m Msg) Values() (vs []Peer) { + v := func() interface{} { + defer func() { + recover() + }() + return m["r"].(map[string]interface{})["values"] + }() + if v == nil { return } vl, ok := v.([]interface{}) @@ -1050,19 +1057,21 @@ func (m Msg) Values() (vs []util.CompactPeer) { } return } - vs = make([]util.CompactPeer, 0, len(vl)) + vs = make([]Peer, 0, len(vl)) for _, i := range vl { s, ok := i.(string) if !ok { panic(i) } + // Because it's a list of strings, we can let the length of the string + // determine the IP version of the compact peer. var cp util.CompactPeer err := cp.UnmarshalBinary([]byte(s)) if err != nil { log.Printf("error decoding values list element: %s", err) continue } - vs = append(vs, cp) + vs = append(vs, Peer{cp.IP[:], int(cp.Port)}) } return } diff --git a/pex.go b/pex.go new file mode 100644 index 00000000..674074fe --- /dev/null +++ b/pex.go @@ -0,0 +1,9 @@ +package torrent + +import "github.com/anacrolix/torrent/util" + +type peerExchangeMessage struct { + Added util.CompactIPv4Peers `bencode:"added"` + AddedFlags []byte `bencode:"added.f"` + Dropped util.CompactIPv4Peers `bencode:"dropped"` +} diff --git a/pex_test.go b/pex_test.go new file mode 100644 index 00000000..f73425fc --- /dev/null +++ b/pex_test.go @@ -0,0 +1,18 @@ +package torrent + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/anacrolix/torrent/bencode" +) + +func TestUnmarshalPex(t *testing.T) { + var pem peerExchangeMessage + err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &pem) + require.NoError(t, err) + require.EqualValues(t, 2, len(pem.Added)) + require.EqualValues(t, 1286, pem.Added[0].Port) + require.EqualValues(t, 0x100*0xb+0xc, pem.Added[1].Port) +} diff --git a/tracker/http.go b/tracker/http.go index 7d196482..15916ee9 100644 --- a/tracker/http.go +++ b/tracker/http.go @@ -43,8 +43,7 @@ func (r *response) UnmarshalPeers() (ret []Peer, err error) { err = fmt.Errorf("unsupported peers value type: %T", r.Peers) return } - cp := make(util.CompactPeers, 0, len(s)/6) - err = cp.UnmarshalBinary([]byte(s)) + cp, err := util.UnmarshalIPv4CompactPeers([]byte(s)) if err != nil { return } diff --git a/tracker/server.go b/tracker/server.go new file mode 100644 index 00000000..e8613b08 --- /dev/null +++ b/tracker/server.go @@ -0,0 +1,115 @@ +package tracker + +import ( + "bytes" + "encoding/binary" + "fmt" + "math/rand" + "net" + + "github.com/anacrolix/torrent/util" +) + +type torrent struct { + Leechers int32 + Seeders int32 + Peers util.CompactIPv4Peers +} + +type server struct { + pc net.PacketConn + conns map[int64]struct{} + t map[[20]byte]torrent +} + +func marshal(parts ...interface{}) (ret []byte, err error) { + var buf bytes.Buffer + for _, p := range parts { + err = binary.Write(&buf, binary.BigEndian, p) + if err != nil { + return + } + } + ret = buf.Bytes() + return +} + +func (me *server) respond(addr net.Addr, rh ResponseHeader, parts ...interface{}) (err error) { + b, err := marshal(append([]interface{}{rh}, parts...)...) + if err != nil { + return + } + _, err = me.pc.WriteTo(b, addr) + return +} + +func (me *server) newConn() (ret int64) { + ret = rand.Int63() + if me.conns == nil { + me.conns = make(map[int64]struct{}) + } + me.conns[ret] = struct{}{} + return +} + +func (me *server) serveOne() (err error) { + b := make([]byte, 0x10000) + n, addr, err := me.pc.ReadFrom(b) + if err != nil { + return + } + r := bytes.NewReader(b[:n]) + var h RequestHeader + err = readBody(r, &h) + if err != nil { + return + } + switch h.Action { + case Connect: + if h.ConnectionId != connectRequestConnectionId { + return + } + connId := me.newConn() + err = me.respond(addr, ResponseHeader{ + Connect, + h.TransactionId, + }, ConnectionResponse{ + connId, + }) + return + case Announce: + if _, ok := me.conns[h.ConnectionId]; !ok { + me.respond(addr, ResponseHeader{ + TransactionId: h.TransactionId, + Action: Error, + }, []byte("not connected")) + return + } + var ar AnnounceRequest + err = readBody(r, &ar) + if err != nil { + return + } + t := me.t[ar.InfoHash] + b, err = t.Peers.MarshalBinary() + if err != nil { + panic(err) + } + err = me.respond(addr, ResponseHeader{ + TransactionId: h.TransactionId, + Action: Announce, + }, AnnounceResponseHeader{ + Interval: 900, + Leechers: t.Leechers, + Seeders: t.Seeders, + }, b) + return + default: + err = fmt.Errorf("unhandled action: %d", h.Action) + me.respond(addr, ResponseHeader{ + TransactionId: h.TransactionId, + Action: Error, + }, []byte("unhandled action")) + return + } +} diff --git a/tracker/udp.go b/tracker/udp.go index 7021e776..3864bee2 100644 --- a/tracker/udp.go +++ b/tracker/udp.go @@ -11,6 +11,8 @@ import ( "net/url" "time" + "github.com/anacrolix/missinggo" + "github.com/anacrolix/torrent/util" ) @@ -22,6 +24,8 @@ const ( Scrape Error + connectRequestConnectionId = 0x41727101980 + // BEP 41 optionTypeEndOfOptions = 0 optionTypeNOP = 1 @@ -121,34 +125,29 @@ func (c *udpClient) Announce(req *AnnounceRequest) (res AnnounceResponse, err er res.Interval = h.Interval res.Leechers = h.Leechers res.Seeders = h.Seeders - for { - var p util.CompactPeer - err = binary.Read(b, binary.BigEndian, &p) - switch err { - case nil: - case io.EOF: - err = nil - fallthrough - default: - return - } + cps, err := util.UnmarshalIPv4CompactPeers(b.Bytes()) + if err != nil { + return + } + for _, cp := range cps { res.Peers = append(res.Peers, Peer{ - IP: p.IP[:], - Port: int(p.Port), + IP: cp.IP[:], + Port: int(cp.Port), }) } + return } // body is the binary serializable request body. trailer is optional data // following it, such as for BEP 41. func (c *udpClient) write(h *RequestHeader, body interface{}, trailer []byte) (err error) { - buf := &bytes.Buffer{} - err = binary.Write(buf, binary.BigEndian, h) + var buf bytes.Buffer + err = binary.Write(&buf, binary.BigEndian, h) if err != nil { panic(err) } if body != nil { - err = binary.Write(buf, binary.BigEndian, body) + err = binary.Write(&buf, binary.BigEndian, body) if err != nil { panic(err) } @@ -177,7 +176,7 @@ func write(w io.Writer, data interface{}) error { // args is the binary serializable request body. trailer is optional data // following it, such as for BEP 41. -func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Reader, err error) { +func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) { tid := newTransactionId() err = c.write(&RequestHeader{ ConnectionId: c.connectionId, @@ -218,12 +217,12 @@ func (c *udpClient) request(action Action, args interface{}, options []byte) (re if h.Action == Error { err = errors.New(buf.String()) } - responseBody = bytes.NewReader(buf.Bytes()) + responseBody = buf return } } -func readBody(r *bytes.Reader, data ...interface{}) (err error) { +func readBody(r io.Reader, data ...interface{}) (err error) { for _, datum := range data { err = binary.Read(r, binary.BigEndian, datum) if err != nil { @@ -241,9 +240,14 @@ func (c *udpClient) Connect() (err error) { if c.connected() { return nil } - c.connectionId = 0x41727101980 + c.connectionId = connectRequestConnectionId if c.socket == nil { - c.socket, err = net.Dial("udp", c.url.Host) + hmp := missinggo.SplitHostPort(c.url.Host) + if hmp.NoPort { + hmp.NoPort = false + hmp.Port = 80 + } + c.socket, err = net.Dial("udp", hmp.String()) if err != nil { return } diff --git a/tracker/udp_test.go b/tracker/udp_test.go index fd845f1b..54a40eb4 100644 --- a/tracker/udp_test.go +++ b/tracker/udp_test.go @@ -4,14 +4,16 @@ import ( "bytes" "crypto/rand" "encoding/binary" + "fmt" "io" "io/ioutil" - "log" "net" "net/url" + "strings" "sync" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/anacrolix/torrent/util" @@ -30,18 +32,16 @@ func TestNetIPv4Bytes(t *testing.T) { } func TestMarshalAnnounceResponse(t *testing.T) { - w := bytes.Buffer{} - peers := util.CompactPeers{{[]byte{127, 0, 0, 1}, 2}, {[]byte{255, 0, 0, 3}, 4}} - err := peers.WriteBinary(&w) - if err != nil { - t.Fatalf("error writing udp announce response addrs: %s", err) - } - if w.String() != "\x7f\x00\x00\x01\x00\x02\xff\x00\x00\x03\x00\x04" { - t.FailNow() - } - if binary.Size(AnnounceResponseHeader{}) != 12 { - t.FailNow() + peers := util.CompactIPv4Peers{ + {[]byte{127, 0, 0, 1}, 2}, + {[]byte{255, 0, 0, 3}, 4}, } + b, err := peers.MarshalBinary() + require.NoError(t, err) + require.EqualValues(t, + "\x7f\x00\x00\x01\x00\x02\xff\x00\x00\x03\x00\x04", + b) + require.EqualValues(t, 12, binary.Size(AnnounceResponseHeader{})) } // Failure to write an entire packet to UDP is expected to given an error. @@ -83,32 +83,80 @@ func TestConvertInt16ToInt(t *testing.T) { } } +func TestAnnounceLocalhost(t *testing.T) { + srv := server{ + t: map[[20]byte]torrent{ + [20]byte{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}: { + Seeders: 1, + Leechers: 2, + Peers: []util.CompactPeer{ + {[]byte{1, 2, 3, 4}, 5}, + {[]byte{6, 7, 8, 9}, 10}, + }, + }, + }, + } + var err error + srv.pc, err = net.ListenPacket("udp", ":0") + require.NoError(t, err) + defer srv.pc.Close() + tr, err := New(fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String())) + require.NoError(t, err) + go func() { + require.NoError(t, srv.serveOne()) + }() + err = tr.Connect() + require.NoError(t, err) + req := AnnounceRequest{ + NumWant: -1, + Event: Started, + } + rand.Read(req.PeerId[:]) + copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}) + go func() { + require.NoError(t, srv.serveOne()) + }() + ar, err := tr.Announce(&req) + require.NoError(t, err) + assert.EqualValues(t, 1, ar.Seeders) + assert.EqualValues(t, 2, len(ar.Peers)) +} + func TestUDPTracker(t *testing.T) { + tr, err := New("udp://tracker.openbittorrent.com:80/announce") + require.NoError(t, err) if testing.Short() { t.SkipNow() } - tr, err := New("udp://tracker.openbittorrent.com:80/announce") - if err != nil { - t.Skip(err) - } if err := tr.Connect(); err != nil { - t.Skip(err) + if strings.Contains(err.Error(), "no such host") { + t.Skip(err) + } + if strings.Contains(err.Error(), "i/o timeout") { + t.Skip(err) + } + t.Fatal(err) } req := AnnounceRequest{ NumWant: -1, - Event: Started, + // Event: Started, } rand.Read(req.PeerId[:]) copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}) - _, err = tr.Announce(&req) - if err != nil { - t.Skip(err) + ar, err := tr.Announce(&req) + if ne, ok := err.(net.Error); ok { + if ne.Timeout() { + t.Skip(err) + } } + require.NoError(t, err) + t.Log(ar) } -// TODO: Create a fake UDP tracker to make these requests to. -func TestAnnounceRandomInfoHash(t *testing.T) { +func TestAnnounceRandomInfoHashThirdParty(t *testing.T) { if testing.Short() { + // This test involves contacting third party servers that may have + // unpreditable results. t.SkipNow() } req := AnnounceRequest{ @@ -117,13 +165,18 @@ func TestAnnounceRandomInfoHash(t *testing.T) { rand.Read(req.PeerId[:]) rand.Read(req.InfoHash[:]) wg := sync.WaitGroup{} + success := make(chan bool) + fail := make(chan struct{}) for _, url := range []string{ "udp://tracker.openbittorrent.com:80/announce", "udp://tracker.publicbt.com:80", "udp://tracker.istole.it:6969", "udp://tracker.ccc.de:80", "udp://tracker.open.demonii.com:1337", + "udp://open.demonii.com:1337", + "udp://exodus.desync.com:6969", } { + wg.Add(1) go func(url string) { defer wg.Done() tr, err := New(url) @@ -140,12 +193,26 @@ func TestAnnounceRandomInfoHash(t *testing.T) { return } if resp.Leechers != 0 || resp.Seeders != 0 || len(resp.Peers) != 0 { + // The info hash we generated was random in 2^160 space. If we + // get a hit, something is weird. t.Fatal(resp) } + t.Logf("announced to %s", url) + // TODO: Can probably get stuck here, but it's just a throwaway + // test. + success <- true }(url) - wg.Add(1) } - wg.Wait() + go func() { + wg.Wait() + close(fail) + }() + // Bail as quickly as we can. + select { + case <-fail: + t.FailNow() + case <-success: + } } // Check that URLPath option is done correctly. @@ -164,7 +231,6 @@ func TestURLPathOption(t *testing.T) { if err != nil { t.Fatal(err) } - log.Print("connected") _, err = cl.Announce(&AnnounceRequest{}) if err != nil { t.Fatal(err) diff --git a/util/types.go b/util/types.go index 68135f85..942b9d73 100644 --- a/util/types.go +++ b/util/types.go @@ -3,85 +3,77 @@ package util import ( "encoding" "encoding/binary" - "fmt" - "io" + "errors" "net" - "strconv" + + "github.com/bradfitz/iter" "github.com/anacrolix/torrent/bencode" ) -type CompactPeers []CompactPeer +// Concatenated 6-byte peer addresses. +type CompactIPv4Peers []CompactPeer + +var ( + // This allows bencode.Unmarshal to do better than a string or []byte. + _ bencode.Unmarshaler = &CompactIPv4Peers{} + _ encoding.BinaryMarshaler = CompactIPv4Peers{} +) -func (me *CompactPeers) UnmarshalBencode(bb []byte) (err error) { - var b []byte - err = bencode.Unmarshal(bb, &b) +// This allows bencode.Unmarshal to do better than a string or []byte. +func (me *CompactIPv4Peers) UnmarshalBencode(b []byte) (err error) { + var bb []byte + err = bencode.Unmarshal(b, &bb) if err != nil { return } - err = me.UnmarshalBinary(b) + *me, err = UnmarshalIPv4CompactPeers(bb) return } -func (me *CompactPeers) UnmarshalBinary(b []byte) (err error) { - for i := 0; i < len(b); i += 6 { - var p CompactPeer - err = p.UnmarshalBinary([]byte(b[i : i+6])) - if err != nil { - return - } - *me = append(*me, p) - } - return -} - -func (me CompactPeers) WriteBinary(w io.Writer) (err error) { - for _, cp := range me { - cp.Write(w) - if err != nil { - return - } +func (me CompactIPv4Peers) MarshalBinary() (ret []byte, err error) { + ret = make([]byte, len(me)*6) + for i, cp := range me { + copy(ret[6*i:], cp.IP.To4()) + binary.BigEndian.PutUint16(ret[6*i+4:], uint16(cp.Port)) } return } +// Represents peer address in either IPv6 or IPv4 form. type CompactPeer struct { IP net.IP - Port uint16 + Port int } -var _ encoding.BinaryUnmarshaler = &CompactPeer{} - -func (cp *CompactPeer) UnmarshalBinary(b []byte) (err error) { +func (me *CompactPeer) UnmarshalBinary(b []byte) error { switch len(b) { case 18: - cp.IP = make([]byte, 16) + me.IP = make([]byte, 16) case 6: - cp.IP = make([]byte, 4) + me.IP = make([]byte, 4) default: - err = fmt.Errorf("bad length: %d", len(b)) - return - } - if n := copy(cp.IP, b); n != len(cp.IP) { - panic(n) + return errors.New("bad length") } - b = b[len(cp.IP):] - if len(b) != 2 { - panic(len(b)) - } - cp.Port = binary.BigEndian.Uint16(b) - return + copy(me.IP, b) + b = b[len(me.IP):] + me.Port = int(binary.BigEndian.Uint16(b)) + return nil } -func (cp *CompactPeer) Write(w io.Writer) (err error) { - _, err = w.Write(cp.IP) - if err != nil { +func UnmarshalIPv4CompactPeers(b []byte) (ret []CompactPeer, err error) { + if len(b)%6 != 0 { + err = errors.New("bad length") return } - err = binary.Write(w, binary.BigEndian, cp.Port) + num := len(b) / 6 + ret = make([]CompactPeer, num) + for i := range iter.N(num) { + off := i * 6 + err = ret[i].UnmarshalBinary(b[off : off+6]) + if err != nil { + return + } + } return } - -func (cp *CompactPeer) String() string { - return net.JoinHostPort(cp.IP.String(), strconv.FormatUint(uint64(cp.Port), 10)) -}