package torrent
-import "github.com/anacrolix/dht/krpc"
+import (
+ "github.com/anacrolix/dht/krpc"
+ "github.com/anacrolix/torrent/tracker"
+)
type Peers []Peer
*me = append(*me, p)
}
}
+
+func (ret Peers) FromTracker(ps []tracker.Peer) Peers {
+ for _, p := range ps {
+ _p := Peer{
+ IP: p.IP,
+ Port: p.Port,
+ Source: peerSourceTracker,
+ }
+ copy(_p.Id[:], p.ID)
+ ret = append(ret, _p)
+ }
+ return ret
+}
"bytes"
"context"
"crypto/rand"
+ "encoding/binary"
"errors"
"expvar"
"fmt"
"io"
"net"
- "net/url"
"strconv"
"strings"
"time"
fmt.Fprintln(w, "Not listening!")
}
fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
+ fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
if dht := cl.DHT(); dht != nil {
dhtStats := dht.Stats()
cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
}
+func (cl *Client) announceKey() int32 {
+ return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
+}
+
// Creates a new client.
func NewClient(cfg *Config) (cl *Client, err error) {
if cfg == nil {
return cl.ipBlockList.Lookup(ip)
}
+func (cl *Client) ipIsBlocked(ip net.IP) bool {
+ _, blocked := cl.ipBlockRange(ip)
+ return blocked
+}
+
func (cl *Client) waitAccept() {
for {
for _, t := range cl.torrents {
return
}
-func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
- _url, err := url.Parse(announceURL)
- if err != nil {
- return
- }
- hmp := missinggo.SplitHostMaybePort(_url.Host)
- if hmp.Err != nil {
- err = hmp.Err
- return
- }
- addr, err := net.ResolveIPAddr("ip", hmp.Host)
- if err != nil {
- return
- }
- if addr.IP == nil {
- panic(hmp.Host)
- }
- cl.mu.RLock()
- _, blocked = cl.ipBlockRange(addr.IP)
- cl.mu.RUnlock()
- host = _url.Host
- hmp.Host = addr.String()
- _url.Host = hmp.String()
- urlToUse = _url.String()
- return
-}
-
func (cl *Client) allTorrentsCompleted() bool {
for _, t := range cl.torrents {
if !t.haveInfo() {
})
}
-func TestPrepareTrackerAnnounce(t *testing.T) {
- cl := &Client{}
- blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
- require.NoError(t, err)
- assert.False(t, blocked)
- assert.EqualValues(t, "localhost:1234", host)
- assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
-}
-
// Check that when the listen port is 0, all the protocols listened on have
// the same port, and it isn't zero.
func TestClientDynamicListenPortAllProtocols(t *testing.T) {
"math"
"math/rand"
"net"
+ "net/url"
"os"
"strconv"
"sync"
tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
fmt.Fprintf(tw, " URL\tNext announce\tLast announce\n")
for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
- return l.url < r.url
+ return l.u.String() < r.u.String()
}).([]*trackerScraper) {
fmt.Fprintf(tw, " %s\n", ta.statusLine())
}
return true
}
-func (t *Torrent) startScrapingTracker(url string) {
- if url == "" {
+func (t *Torrent) startScrapingTracker(_url string) {
+ if _url == "" {
return
}
- if _, ok := t.trackerAnnouncers[url]; ok {
+ u, _ := url.Parse(_url)
+ if u.Scheme == "udp" {
+ u.Scheme = "udp4"
+ t.startScrapingTracker(u.String())
+ u.Scheme = "udp6"
+ t.startScrapingTracker(u.String())
+ return
+ }
+ if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
+ return
+ }
+ if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
+ return
+ }
+ if _, ok := t.trackerAnnouncers[_url]; ok {
return
}
newAnnouncer := &trackerScraper{
- url: url,
- t: t,
+ u: *u,
+ t: t,
}
if t.trackerAnnouncers == nil {
t.trackerAnnouncers = make(map[string]*trackerScraper)
}
- t.trackerAnnouncers[url] = newAnnouncer
+ t.trackerAnnouncers[_url] = newAnnouncer
go newAnnouncer.Run()
}
PeerId: t.cl.peerID,
InfoHash: t.infoHash,
Left: t.bytesLeftAnnounce(),
+ Key: t.cl.announceKey(),
}
}
}
}
-func setAnnounceParams(_url *url.URL, ar *AnnounceRequest) {
+func setAnnounceParams(_url *url.URL, ar *AnnounceRequest, opts Announce) {
q := _url.Query()
q.Set("info_hash", string(ar.InfoHash[:]))
// According to https://wiki.vuze.com/w/Message_Stream_Encryption. TODO:
// Take EncryptionPolicy or something like it as a parameter.
q.Set("supportcrypto", "1")
-
+ if opts.ClientIp4.IP != nil {
+ q.Set("ipv4", opts.ClientIp4.String())
+ }
+ if opts.ClientIp6.IP != nil {
+ q.Set("ipv6", opts.ClientIp6.String())
+ }
_url.RawQuery = q.Encode()
}
-func announceHTTP(cl *http.Client, userAgent string, ar *AnnounceRequest, _url *url.URL, host string) (ret AnnounceResponse, err error) {
+func announceHTTP(opt Announce, _url *url.URL) (ret AnnounceResponse, err error) {
_url = httptoo.CopyURL(_url)
- setAnnounceParams(_url, ar)
+ setAnnounceParams(_url, &opt.Request, opt)
req, err := http.NewRequest("GET", _url.String(), nil)
- req.Header.Set("User-Agent", userAgent)
- req.Host = host
- resp, err := cl.Do(req)
+ req.Header.Set("User-Agent", opt.UserAgent)
+ req.Host = opt.HostHeader
+ resp, err := opt.HttpClient.Do(req)
if err != nil {
return
}
import (
"net"
+
+ "github.com/anacrolix/dht/krpc"
)
type Peer struct {
p.ID = []byte(d["peer id"].(string))
p.Port = int(d["port"].(int64))
}
+
+func (p Peer) FromNodeAddr(na krpc.NodeAddr) Peer {
+ p.IP = na.IP
+ p.Port = na.Port
+ return p
+}
import (
"bytes"
+ "encoding"
"encoding/binary"
"fmt"
"math/rand"
"net"
"github.com/anacrolix/dht/krpc"
+ "github.com/anacrolix/missinggo"
)
type torrent struct {
Leechers int32
Seeders int32
- Peers krpc.CompactIPv4NodeAddrs
+ Peers []krpc.NodeAddr
}
type server struct {
return
}
t := s.t[ar.InfoHash]
- b, err = t.Peers.MarshalBinary()
+ bm := func() encoding.BinaryMarshaler {
+ ip := missinggo.AddrIP(addr)
+ if ip.To4() != nil {
+ return krpc.CompactIPv4NodeAddrs(t.Peers)
+ }
+ return krpc.CompactIPv6NodeAddrs(t.Peers)
+ }()
+ b, err = bm.MarshalBinary()
if err != nil {
panic(err)
}
"errors"
"net/http"
"net/url"
+
+ "github.com/anacrolix/dht/krpc"
)
// Marshalled as binary by the UDP client, so be careful making changes.
ErrBadScheme = errors.New("unknown scheme")
)
-// TODO: Just split udp/http announcing completely, to support various different options they have.
-
-func Announce(cl *http.Client, userAgent string, urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) {
- return AnnounceHost(cl, userAgent, urlStr, req, "")
+type Announce struct {
+ TrackerUrl string
+ Request AnnounceRequest
+ HostHeader string
+ UserAgent string
+ HttpClient *http.Client
+ UdpNetwork string
+ ClientIp4 krpc.NodeAddr
+ ClientIp6 krpc.NodeAddr
}
-func AnnounceHost(cl *http.Client, userAgent string, urlStr string, req *AnnounceRequest, host string) (res AnnounceResponse, err error) {
- _url, err := url.Parse(urlStr)
+// In an FP language with currying, what order what you put these params?
+
+func (me Announce) Do() (res AnnounceResponse, err error) {
+ _url, err := url.Parse(me.TrackerUrl)
if err != nil {
return
}
switch _url.Scheme {
case "http", "https":
- return announceHTTP(cl, userAgent, req, _url, host)
- case "udp":
- return announceUDP(req, _url)
+ return announceHTTP(me, _url)
+ case "udp", "udp4", "udp6":
+ return announceUDP(me, _url)
default:
err = ErrBadScheme
return
"net/http"
"testing"
"time"
+
+ "github.com/stretchr/testify/require"
)
var defaultClient = &http.Client{
func TestUnsupportedTrackerScheme(t *testing.T) {
t.Parallel()
- _, err := Announce(defaultClient, defaultHTTPUserAgent, "lol://tracker.openbittorrent.com:80/announce", nil)
- if err != ErrBadScheme {
- t.Fatal(err)
- }
+ _, err := Announce{TrackerUrl: "lol://tracker.openbittorrent.com:80/announce"}.Do()
+ require.Equal(t, ErrBadScheme, err)
}
import (
"bytes"
+ "encoding"
"encoding/binary"
"errors"
"fmt"
connectionId int64
socket net.Conn
url url.URL
+ a *Announce
}
func (c *udpAnnounce) Close() error {
return nil
}
+func (c *udpAnnounce) ipv6() bool {
+ if c.a.UdpNetwork == "udp6" {
+ return true
+ }
+ rip := missinggo.AddrIP(c.socket.RemoteAddr())
+ return rip.To16() != nil && rip.To4() == nil
+}
+
func (c *udpAnnounce) Do(req *AnnounceRequest) (res AnnounceResponse, err error) {
err = c.connect()
if err != nil {
res.Interval = h.Interval
res.Leechers = h.Leechers
res.Seeders = h.Seeders
- var cps krpc.CompactIPv4NodeAddrs
- err = cps.UnmarshalBinary(b.Bytes())
+ nas := func() interface {
+ encoding.BinaryUnmarshaler
+ NodeAddrs() []krpc.NodeAddr
+ } {
+ if c.ipv6() {
+ return &krpc.CompactIPv6NodeAddrs{}
+ } else {
+ return &krpc.CompactIPv4NodeAddrs{}
+ }
+ }()
+ err = nas.UnmarshalBinary(b.Bytes())
if err != nil {
return
}
- for _, cp := range cps {
- res.Peers = append(res.Peers, Peer{
- IP: cp.IP[:],
- Port: int(cp.Port),
- })
+ for _, cp := range nas.NodeAddrs() {
+ res.Peers = append(res.Peers, Peer{}.FromNodeAddr(cp))
}
return
}
return !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute))
}
+func (c *udpAnnounce) dialNetwork() string {
+ if c.a.UdpNetwork != "" {
+ return c.a.UdpNetwork
+ }
+ return "udp"
+}
+
func (c *udpAnnounce) connect() (err error) {
if c.connected() {
return nil
hmp.NoPort = false
hmp.Port = 80
}
- c.socket, err = net.Dial("udp", hmp.String())
+ c.socket, err = net.Dial(c.dialNetwork(), hmp.String())
if err != nil {
return
}
// TODO: Split on IPv6, as BEP 15 says response peer decoding depends on
// network in use.
-func announceUDP(ar *AnnounceRequest, _url *url.URL) (AnnounceResponse, error) {
+func announceUDP(opt Announce, _url *url.URL) (AnnounceResponse, error) {
ua := udpAnnounce{
url: *_url,
+ a: &opt,
}
defer ua.Close()
- return ua.Do(ar)
+ return ua.Do(&opt.Request)
}
go func() {
require.NoError(t, srv.serveOne())
}()
- ar, err := Announce(defaultClient, defaultHTTPUserAgent, fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()), &req)
+ ar, err := Announce{
+ TrackerUrl: fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()),
+ Request: req,
+ }.Do()
require.NoError(t, err)
assert.EqualValues(t, 1, ar.Seeders)
assert.EqualValues(t, 2, len(ar.Peers))
}
rand.Read(req.PeerId[:])
copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
- ar, err := Announce(defaultClient, defaultHTTPUserAgent, trackers[0], &req)
+ ar, err := Announce{
+ TrackerUrl: trackers[0],
+ Request: req,
+ }.Do()
// Skip any net errors as we don't control the server.
if _, ok := err.(net.Error); ok {
t.Skip(err)
wg.Add(1)
go func(url string) {
defer wg.Done()
- resp, err := Announce(defaultClient, defaultHTTPUserAgent, url, &req)
+ resp, err := Announce{
+ TrackerUrl: url,
+ Request: req,
+ }.Do()
if err != nil {
t.Logf("error announcing to %s: %s", url, err)
return
}
defer conn.Close()
go func() {
- _, err := Announce(defaultClient, defaultHTTPUserAgent, (&url.URL{
- Scheme: "udp",
- Host: conn.LocalAddr().String(),
- Path: "/announce",
- }).String(), &AnnounceRequest{})
+ _, err := Announce{
+ TrackerUrl: (&url.URL{
+ Scheme: "udp",
+ Host: conn.LocalAddr().String(),
+ Path: "/announce",
+ }).String(),
+ }.Do()
if err != nil {
defer conn.Close()
}
"bytes"
"errors"
"fmt"
+ "net"
+ "net/url"
"time"
"github.com/anacrolix/missinggo"
// Announces a torrent to a tracker at regular intervals, when peers are
// required.
type trackerScraper struct {
- url string
+ u url.URL
// Causes the trackerScraper to stop running.
stop missinggo.Event
t *Torrent
func (ts *trackerScraper) statusLine() string {
var w bytes.Buffer
fmt.Fprintf(&w, "%q\t%s\t%s",
- ts.url,
+ ts.u.String(),
func() string {
na := time.Until(ts.lastAnnounce.Completed.Add(ts.lastAnnounce.Interval))
if na > 0 {
return "never"
}
return fmt.Sprintf("%d peers", ts.lastAnnounce.NumPeers)
- }())
+ }(),
+ )
return w.String()
}
Completed time.Time
}
-func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) {
- ret = make([]Peer, 0, len(ps))
- for _, p := range ps {
- ret = append(ret, Peer{
- IP: p.IP,
- Port: p.Port,
- Source: peerSourceTracker,
- })
+func (me *trackerScraper) getIp() (ip net.IP, err error) {
+ ips, err := net.LookupIP(me.u.Hostname())
+ if err != nil {
+ return
+ }
+ if len(ips) == 0 {
+ err = errors.New("no ips")
+ return
+ }
+ for _, ip = range ips {
+ if me.t.cl.ipIsBlocked(ip) {
+ continue
+ }
+ switch me.u.Scheme {
+ case "udp4":
+ if ip.To4() == nil {
+ continue
+ }
+ case "udp6":
+ if ip.To4() != nil {
+ continue
+ }
+ }
+ return
}
+ err = errors.New("no acceptable ips")
return
}
+func (me *trackerScraper) trackerUrl(ip net.IP) string {
+ u := me.u
+ if u.Port() != "" {
+ u.Host = net.JoinHostPort(ip.String(), u.Port())
+ }
+ return u.String()
+}
+
// Return how long to wait before trying again. For most errors, we return 5
// minutes, a relatively quick turn around for DNS changes.
func (me *trackerScraper) announce() (ret trackerAnnounceResult) {
ret.Completed = time.Now()
}()
ret.Interval = 5 * time.Minute
- blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url)
+ ip, err := me.getIp()
if err != nil {
- ret.Err = err
- return
- }
- if blocked {
- ret.Err = errors.New("blocked by IP")
+ ret.Err = fmt.Errorf("error getting ip: %s", err)
return
}
me.t.cl.mu.Lock()
req := me.t.announceRequest()
me.t.cl.mu.Unlock()
- res, err := tracker.AnnounceHost(me.t.cl.config.HTTP, me.t.cl.config.HTTPUserAgent, urlToUse, &req, host)
+ res, err := tracker.Announce{
+ HttpClient: me.t.cl.config.HTTP,
+ UserAgent: me.t.cl.config.HTTPUserAgent,
+ TrackerUrl: me.trackerUrl(ip),
+ Request: req,
+ HostHeader: me.u.Host,
+ }.Do()
if err != nil {
- ret.Err = err
+ ret.Err = fmt.Errorf("error announcing: %s", err)
return
}
- me.t.AddPeers(trackerToTorrentPeers(res.Peers))
+ me.t.AddPeers(Peers(nil).FromTracker(res.Peers))
ret.NumPeers = len(res.Peers)
ret.Interval = time.Duration(res.Interval) * time.Second
return