"github.com/anacrolix/torrent/dht"
"github.com/anacrolix/torrent/internal/pieceordering"
"github.com/anacrolix/torrent/iplist"
- "github.com/anacrolix/torrent/logonce"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/tracker"
- . "github.com/anacrolix/torrent/util"
)
var (
return
}
-type peerExchangeMessage struct {
- Added CompactPeers `bencode:"added"`
- AddedFlags []byte `bencode:"added.f"`
- Dropped CompactPeers `bencode:"dropped"`
-}
-
// Extracts the port as an integer from an address string.
func addrPort(addr net.Addr) int {
return AddrPort(addr)
for trIndex, tr := range tier {
numTrackersTried++
err := cl.announceTorrentSingleTracker(tr, &req, t)
- if err != nil {
- logonce.Stderr.Printf("%s: error announcing to %s: %s", t, tr, err)
+ if err != nil && missinggo.CryHeard() {
+ log.Printf("%s: error announcing to %s: %s", t, tr, err)
continue
}
// Float the successful announce to the top of the tier. If
"github.com/willf/bloom"
"github.com/anacrolix/torrent/logonce"
- "github.com/anacrolix/torrent/util"
)
// Maintains state for an ongoing Announce operation. An Announce is started
// peers that a node has reported as being in the swarm for a queried info
// hash.
type PeersValues struct {
- Peers []util.CompactPeer // Peers given in get_peers response.
- NodeInfo // The node that gave the response.
+ Peers []Peer // Peers given in get_peers response.
+ NodeInfo // The node that gave the response.
}
// Stop the announce.
"math/rand"
"net"
"os"
+ "strconv"
"time"
"github.com/anacrolix/missinggo"
return
}
+type Peer struct {
+ IP net.IP
+ Port int
+}
+
+func (me *Peer) String() string {
+ return net.JoinHostPort(me.IP.String(), strconv.FormatInt(int64(me.Port), 10))
+}
+
// In a get_peers response, the addresses of torrent clients involved with the
// queried info-hash.
-func (m Msg) Values() (vs []util.CompactPeer) {
- r, ok := m["r"]
- if !ok {
- return
- }
- rd, ok := r.(map[string]interface{})
- if !ok {
- return
- }
- v, ok := rd["values"]
- if !ok {
+func (m Msg) Values() (vs []Peer) {
+ v := func() interface{} {
+ defer func() {
+ recover()
+ }()
+ return m["r"].(map[string]interface{})["values"]
+ }()
+ if v == nil {
return
}
vl, ok := v.([]interface{})
}
return
}
- vs = make([]util.CompactPeer, 0, len(vl))
+ vs = make([]Peer, 0, len(vl))
for _, i := range vl {
s, ok := i.(string)
if !ok {
panic(i)
}
+ // Because it's a list of strings, we can let the length of the string
+ // determine the IP version of the compact peer.
var cp util.CompactPeer
err := cp.UnmarshalBinary([]byte(s))
if err != nil {
log.Printf("error decoding values list element: %s", err)
continue
}
- vs = append(vs, cp)
+ vs = append(vs, Peer{cp.IP[:], int(cp.Port)})
}
return
}
--- /dev/null
+package torrent
+
+import "github.com/anacrolix/torrent/util"
+
+type peerExchangeMessage struct {
+ Added util.CompactIPv4Peers `bencode:"added"`
+ AddedFlags []byte `bencode:"added.f"`
+ Dropped util.CompactIPv4Peers `bencode:"dropped"`
+}
--- /dev/null
+package torrent
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/anacrolix/torrent/bencode"
+)
+
+func TestUnmarshalPex(t *testing.T) {
+ var pem peerExchangeMessage
+ err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &pem)
+ require.NoError(t, err)
+ require.EqualValues(t, 2, len(pem.Added))
+ require.EqualValues(t, 1286, pem.Added[0].Port)
+ require.EqualValues(t, 0x100*0xb+0xc, pem.Added[1].Port)
+}
err = fmt.Errorf("unsupported peers value type: %T", r.Peers)
return
}
- cp := make(util.CompactPeers, 0, len(s)/6)
- err = cp.UnmarshalBinary([]byte(s))
+ cp, err := util.UnmarshalIPv4CompactPeers([]byte(s))
if err != nil {
return
}
--- /dev/null
+package tracker
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "math/rand"
+ "net"
+
+ "github.com/anacrolix/torrent/util"
+)
+
+type torrent struct {
+ Leechers int32
+ Seeders int32
+ Peers util.CompactIPv4Peers
+}
+
+type server struct {
+ pc net.PacketConn
+ conns map[int64]struct{}
+ t map[[20]byte]torrent
+}
+
+func marshal(parts ...interface{}) (ret []byte, err error) {
+ var buf bytes.Buffer
+ for _, p := range parts {
+ err = binary.Write(&buf, binary.BigEndian, p)
+ if err != nil {
+ return
+ }
+ }
+ ret = buf.Bytes()
+ return
+}
+
+func (me *server) respond(addr net.Addr, rh ResponseHeader, parts ...interface{}) (err error) {
+ b, err := marshal(append([]interface{}{rh}, parts...)...)
+ if err != nil {
+ return
+ }
+ _, err = me.pc.WriteTo(b, addr)
+ return
+}
+
+func (me *server) newConn() (ret int64) {
+ ret = rand.Int63()
+ if me.conns == nil {
+ me.conns = make(map[int64]struct{})
+ }
+ me.conns[ret] = struct{}{}
+ return
+}
+
+func (me *server) serveOne() (err error) {
+ b := make([]byte, 0x10000)
+ n, addr, err := me.pc.ReadFrom(b)
+ if err != nil {
+ return
+ }
+ r := bytes.NewReader(b[:n])
+ var h RequestHeader
+ err = readBody(r, &h)
+ if err != nil {
+ return
+ }
+ switch h.Action {
+ case Connect:
+ if h.ConnectionId != connectRequestConnectionId {
+ return
+ }
+ connId := me.newConn()
+ err = me.respond(addr, ResponseHeader{
+ Connect,
+ h.TransactionId,
+ }, ConnectionResponse{
+ connId,
+ })
+ return
+ case Announce:
+ if _, ok := me.conns[h.ConnectionId]; !ok {
+ me.respond(addr, ResponseHeader{
+ TransactionId: h.TransactionId,
+ Action: Error,
+ }, []byte("not connected"))
+ return
+ }
+ var ar AnnounceRequest
+ err = readBody(r, &ar)
+ if err != nil {
+ return
+ }
+ t := me.t[ar.InfoHash]
+ b, err = t.Peers.MarshalBinary()
+ if err != nil {
+ panic(err)
+ }
+ err = me.respond(addr, ResponseHeader{
+ TransactionId: h.TransactionId,
+ Action: Announce,
+ }, AnnounceResponseHeader{
+ Interval: 900,
+ Leechers: t.Leechers,
+ Seeders: t.Seeders,
+ }, b)
+ return
+ default:
+ err = fmt.Errorf("unhandled action: %d", h.Action)
+ me.respond(addr, ResponseHeader{
+ TransactionId: h.TransactionId,
+ Action: Error,
+ }, []byte("unhandled action"))
+ return
+ }
+}
"net/url"
"time"
+ "github.com/anacrolix/missinggo"
+
"github.com/anacrolix/torrent/util"
)
Scrape
Error
+ connectRequestConnectionId = 0x41727101980
+
// BEP 41
optionTypeEndOfOptions = 0
optionTypeNOP = 1
res.Interval = h.Interval
res.Leechers = h.Leechers
res.Seeders = h.Seeders
- for {
- var p util.CompactPeer
- err = binary.Read(b, binary.BigEndian, &p)
- switch err {
- case nil:
- case io.EOF:
- err = nil
- fallthrough
- default:
- return
- }
+ cps, err := util.UnmarshalIPv4CompactPeers(b.Bytes())
+ if err != nil {
+ return
+ }
+ for _, cp := range cps {
res.Peers = append(res.Peers, Peer{
- IP: p.IP[:],
- Port: int(p.Port),
+ IP: cp.IP[:],
+ Port: int(cp.Port),
})
}
+ return
}
// body is the binary serializable request body. trailer is optional data
// following it, such as for BEP 41.
func (c *udpClient) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
- buf := &bytes.Buffer{}
- err = binary.Write(buf, binary.BigEndian, h)
+ var buf bytes.Buffer
+ err = binary.Write(&buf, binary.BigEndian, h)
if err != nil {
panic(err)
}
if body != nil {
- err = binary.Write(buf, binary.BigEndian, body)
+ err = binary.Write(&buf, binary.BigEndian, body)
if err != nil {
panic(err)
}
// args is the binary serializable request body. trailer is optional data
// following it, such as for BEP 41.
-func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Reader, err error) {
+func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
tid := newTransactionId()
err = c.write(&RequestHeader{
ConnectionId: c.connectionId,
if h.Action == Error {
err = errors.New(buf.String())
}
- responseBody = bytes.NewReader(buf.Bytes())
+ responseBody = buf
return
}
}
-func readBody(r *bytes.Reader, data ...interface{}) (err error) {
+func readBody(r io.Reader, data ...interface{}) (err error) {
for _, datum := range data {
err = binary.Read(r, binary.BigEndian, datum)
if err != nil {
if c.connected() {
return nil
}
- c.connectionId = 0x41727101980
+ c.connectionId = connectRequestConnectionId
if c.socket == nil {
- c.socket, err = net.Dial("udp", c.url.Host)
+ hmp := missinggo.SplitHostPort(c.url.Host)
+ if hmp.NoPort {
+ hmp.NoPort = false
+ hmp.Port = 80
+ }
+ c.socket, err = net.Dial("udp", hmp.String())
if err != nil {
return
}
"bytes"
"crypto/rand"
"encoding/binary"
+ "fmt"
"io"
"io/ioutil"
- "log"
"net"
"net/url"
+ "strings"
"sync"
"testing"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/anacrolix/torrent/util"
}
func TestMarshalAnnounceResponse(t *testing.T) {
- w := bytes.Buffer{}
- peers := util.CompactPeers{{[]byte{127, 0, 0, 1}, 2}, {[]byte{255, 0, 0, 3}, 4}}
- err := peers.WriteBinary(&w)
- if err != nil {
- t.Fatalf("error writing udp announce response addrs: %s", err)
- }
- if w.String() != "\x7f\x00\x00\x01\x00\x02\xff\x00\x00\x03\x00\x04" {
- t.FailNow()
- }
- if binary.Size(AnnounceResponseHeader{}) != 12 {
- t.FailNow()
+ peers := util.CompactIPv4Peers{
+ {[]byte{127, 0, 0, 1}, 2},
+ {[]byte{255, 0, 0, 3}, 4},
}
+ b, err := peers.MarshalBinary()
+ require.NoError(t, err)
+ require.EqualValues(t,
+ "\x7f\x00\x00\x01\x00\x02\xff\x00\x00\x03\x00\x04",
+ b)
+ require.EqualValues(t, 12, binary.Size(AnnounceResponseHeader{}))
}
// Failure to write an entire packet to UDP is expected to given an error.
}
}
+func TestAnnounceLocalhost(t *testing.T) {
+ srv := server{
+ t: map[[20]byte]torrent{
+ [20]byte{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}: {
+ Seeders: 1,
+ Leechers: 2,
+ Peers: []util.CompactPeer{
+ {[]byte{1, 2, 3, 4}, 5},
+ {[]byte{6, 7, 8, 9}, 10},
+ },
+ },
+ },
+ }
+ var err error
+ srv.pc, err = net.ListenPacket("udp", ":0")
+ require.NoError(t, err)
+ defer srv.pc.Close()
+ tr, err := New(fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()))
+ require.NoError(t, err)
+ go func() {
+ require.NoError(t, srv.serveOne())
+ }()
+ err = tr.Connect()
+ require.NoError(t, err)
+ req := AnnounceRequest{
+ NumWant: -1,
+ Event: Started,
+ }
+ rand.Read(req.PeerId[:])
+ copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
+ go func() {
+ require.NoError(t, srv.serveOne())
+ }()
+ ar, err := tr.Announce(&req)
+ require.NoError(t, err)
+ assert.EqualValues(t, 1, ar.Seeders)
+ assert.EqualValues(t, 2, len(ar.Peers))
+}
+
func TestUDPTracker(t *testing.T) {
+ tr, err := New("udp://tracker.openbittorrent.com:80/announce")
+ require.NoError(t, err)
if testing.Short() {
t.SkipNow()
}
- tr, err := New("udp://tracker.openbittorrent.com:80/announce")
- if err != nil {
- t.Skip(err)
- }
if err := tr.Connect(); err != nil {
- t.Skip(err)
+ if strings.Contains(err.Error(), "no such host") {
+ t.Skip(err)
+ }
+ if strings.Contains(err.Error(), "i/o timeout") {
+ t.Skip(err)
+ }
+ t.Fatal(err)
}
req := AnnounceRequest{
NumWant: -1,
- Event: Started,
+ // Event: Started,
}
rand.Read(req.PeerId[:])
copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
- _, err = tr.Announce(&req)
- if err != nil {
- t.Skip(err)
+ ar, err := tr.Announce(&req)
+ if ne, ok := err.(net.Error); ok {
+ if ne.Timeout() {
+ t.Skip(err)
+ }
}
+ require.NoError(t, err)
+ t.Log(ar)
}
-// TODO: Create a fake UDP tracker to make these requests to.
-func TestAnnounceRandomInfoHash(t *testing.T) {
+func TestAnnounceRandomInfoHashThirdParty(t *testing.T) {
if testing.Short() {
+ // This test involves contacting third party servers that may have
+ // unpreditable results.
t.SkipNow()
}
req := AnnounceRequest{
rand.Read(req.PeerId[:])
rand.Read(req.InfoHash[:])
wg := sync.WaitGroup{}
+ success := make(chan bool)
+ fail := make(chan struct{})
for _, url := range []string{
"udp://tracker.openbittorrent.com:80/announce",
"udp://tracker.publicbt.com:80",
"udp://tracker.istole.it:6969",
"udp://tracker.ccc.de:80",
"udp://tracker.open.demonii.com:1337",
+ "udp://open.demonii.com:1337",
+ "udp://exodus.desync.com:6969",
} {
+ wg.Add(1)
go func(url string) {
defer wg.Done()
tr, err := New(url)
return
}
if resp.Leechers != 0 || resp.Seeders != 0 || len(resp.Peers) != 0 {
+ // The info hash we generated was random in 2^160 space. If we
+ // get a hit, something is weird.
t.Fatal(resp)
}
+ t.Logf("announced to %s", url)
+ // TODO: Can probably get stuck here, but it's just a throwaway
+ // test.
+ success <- true
}(url)
- wg.Add(1)
}
- wg.Wait()
+ go func() {
+ wg.Wait()
+ close(fail)
+ }()
+ // Bail as quickly as we can.
+ select {
+ case <-fail:
+ t.FailNow()
+ case <-success:
+ }
}
// Check that URLPath option is done correctly.
if err != nil {
t.Fatal(err)
}
- log.Print("connected")
_, err = cl.Announce(&AnnounceRequest{})
if err != nil {
t.Fatal(err)
import (
"encoding"
"encoding/binary"
- "fmt"
- "io"
+ "errors"
"net"
- "strconv"
+
+ "github.com/bradfitz/iter"
"github.com/anacrolix/torrent/bencode"
)
-type CompactPeers []CompactPeer
+// Concatenated 6-byte peer addresses.
+type CompactIPv4Peers []CompactPeer
+
+var (
+ // This allows bencode.Unmarshal to do better than a string or []byte.
+ _ bencode.Unmarshaler = &CompactIPv4Peers{}
+ _ encoding.BinaryMarshaler = CompactIPv4Peers{}
+)
-func (me *CompactPeers) UnmarshalBencode(bb []byte) (err error) {
- var b []byte
- err = bencode.Unmarshal(bb, &b)
+// This allows bencode.Unmarshal to do better than a string or []byte.
+func (me *CompactIPv4Peers) UnmarshalBencode(b []byte) (err error) {
+ var bb []byte
+ err = bencode.Unmarshal(b, &bb)
if err != nil {
return
}
- err = me.UnmarshalBinary(b)
+ *me, err = UnmarshalIPv4CompactPeers(bb)
return
}
-func (me *CompactPeers) UnmarshalBinary(b []byte) (err error) {
- for i := 0; i < len(b); i += 6 {
- var p CompactPeer
- err = p.UnmarshalBinary([]byte(b[i : i+6]))
- if err != nil {
- return
- }
- *me = append(*me, p)
- }
- return
-}
-
-func (me CompactPeers) WriteBinary(w io.Writer) (err error) {
- for _, cp := range me {
- cp.Write(w)
- if err != nil {
- return
- }
+func (me CompactIPv4Peers) MarshalBinary() (ret []byte, err error) {
+ ret = make([]byte, len(me)*6)
+ for i, cp := range me {
+ copy(ret[6*i:], cp.IP.To4())
+ binary.BigEndian.PutUint16(ret[6*i+4:], uint16(cp.Port))
}
return
}
+// Represents peer address in either IPv6 or IPv4 form.
type CompactPeer struct {
IP net.IP
- Port uint16
+ Port int
}
-var _ encoding.BinaryUnmarshaler = &CompactPeer{}
-
-func (cp *CompactPeer) UnmarshalBinary(b []byte) (err error) {
+func (me *CompactPeer) UnmarshalBinary(b []byte) error {
switch len(b) {
case 18:
- cp.IP = make([]byte, 16)
+ me.IP = make([]byte, 16)
case 6:
- cp.IP = make([]byte, 4)
+ me.IP = make([]byte, 4)
default:
- err = fmt.Errorf("bad length: %d", len(b))
- return
- }
- if n := copy(cp.IP, b); n != len(cp.IP) {
- panic(n)
+ return errors.New("bad length")
}
- b = b[len(cp.IP):]
- if len(b) != 2 {
- panic(len(b))
- }
- cp.Port = binary.BigEndian.Uint16(b)
- return
+ copy(me.IP, b)
+ b = b[len(me.IP):]
+ me.Port = int(binary.BigEndian.Uint16(b))
+ return nil
}
-func (cp *CompactPeer) Write(w io.Writer) (err error) {
- _, err = w.Write(cp.IP)
- if err != nil {
+func UnmarshalIPv4CompactPeers(b []byte) (ret []CompactPeer, err error) {
+ if len(b)%6 != 0 {
+ err = errors.New("bad length")
return
}
- err = binary.Write(w, binary.BigEndian, cp.Port)
+ num := len(b) / 6
+ ret = make([]CompactPeer, num)
+ for i := range iter.N(num) {
+ off := i * 6
+ err = ret[i].UnmarshalBinary(b[off : off+6])
+ if err != nil {
+ return
+ }
+ }
return
}
-
-func (cp *CompactPeer) String() string {
- return net.JoinHostPort(cp.IP.String(), strconv.FormatUint(uint64(cp.Port), 10))
-}