client.go | 20 ++++++++++++-------- client_test.go | 20 +++++++++++--------- cmd/tracker-announce/main.go | 2 +- config.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++ global.go | 23 ----------------------- torrent.go | 6 +++--- tracker/http.go | 18 ++---------------- tracker/tracker.go | 9 +++++---- tracker/tracker_test.go | 17 ++++++++++++++++- tracker/udp_test.go | 8 ++++---- tracker_scraper.go | 2 +- diff --git a/client.go b/client.go index 6a8b6216413f9f96e0f2b701785b6af2de9a973b..2a7f9333b7935c5120d95c02c7a7bea200ff66cf 100644 --- a/client.go +++ b/client.go @@ -230,6 +230,10 @@ StartingNodes: dht.GlobalBootstrapAddrs, }, } } + if cfg == nil { + cfg = &Config{} + } + cfg.setDefaults() defer func() { if err != nil { @@ -237,7 +241,7 @@ cl = nil } }() cl = &Client{ - halfOpenLimit: defaultHalfOpenConnsPerTorrent, + halfOpenLimit: cfg.HalfOpenConnsPerTorrent, config: *cfg, dopplegangerAddrs: make(map[string]struct{}), torrents: make(map[metainfo.Hash]*Torrent), @@ -278,7 +282,7 @@ if cfg.PeerID != "" { missinggo.CopyExact(&cl.peerID, cfg.PeerID) } else { - o := copy(cl.peerID[:], bep20) + o := copy(cl.peerID[:], cfg.Bep20) _, err = rand.Read(cl.peerID[o:]) if err != nil { panic("error generating peer id") @@ -484,7 +488,7 @@ unsuccessfulDials.Add(1) } } -func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) { +func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) { ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit) if ret < minDialTimeout { ret = minDialTimeout @@ -602,7 +606,7 @@ func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) { c = cl.newConnection(nc) c.headerEncrypted = encryptHeader c.uTP = utp - ctx, cancel := context.WithTimeout(ctx, handshakesTimeout) + ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) defer cancel() dl, ok := ctx.Deadline() if !ok { @@ -806,7 +810,7 @@ cl.runHandshookConn(c, t, true) } func (cl *Client) runReceivedConn(c *connection) { - err := c.conn.SetDeadline(time.Now().Add(handshakesTimeout)) + err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout)) if err != nil { panic(err) } @@ -863,7 +867,7 @@ ret["ut_pex"] = pexExtendedId } return }(), - "v": extendedHandshakeClientVersion, + "v": cl.config.ExtendedHandshakeClientVersion, // No upload queue is implemented yet. "reqq": 64, } @@ -1021,13 +1025,13 @@ t = &Torrent{ cl: cl, infoHash: ih, peers: make(map[peersKey]Peer), - conns: make(map[*connection]struct{}, 2*defaultEstablishedConnsPerTorrent), + conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent), halfOpen: make(map[string]Peer), pieceStateChanges: pubsub.NewPubSub(), storageOpener: storageClient, - maxEstablishedConns: defaultEstablishedConnsPerTorrent, + maxEstablishedConns: cl.config.EstablishedConnsPerTorrent, networkingEnabled: true, requestStrategy: 2, diff --git a/client_test.go b/client_test.go index f589186c89dacc8e8ffc9b1426986067b598ae13..e06601f416011baa8beecc5d8bb102751bb6f9a6 100644 --- a/client_test.go +++ b/client_test.go @@ -132,23 +132,25 @@ } } func TestReducedDialTimeout(t *testing.T) { + cfg := &Config{} + cfg.setDefaults() for _, _case := range []struct { Max time.Duration HalfOpenLimit int PendingPeers int ExpectedReduced time.Duration }{ - {nominalDialTimeout, 40, 0, nominalDialTimeout}, - {nominalDialTimeout, 40, 1, nominalDialTimeout}, - {nominalDialTimeout, 40, 39, nominalDialTimeout}, - {nominalDialTimeout, 40, 40, nominalDialTimeout / 2}, - {nominalDialTimeout, 40, 80, nominalDialTimeout / 3}, - {nominalDialTimeout, 40, 4000, nominalDialTimeout / 101}, + {cfg.NominalDialTimeout, 40, 0, cfg.NominalDialTimeout}, + {cfg.NominalDialTimeout, 40, 1, cfg.NominalDialTimeout}, + {cfg.NominalDialTimeout, 40, 39, cfg.NominalDialTimeout}, + {cfg.NominalDialTimeout, 40, 40, cfg.NominalDialTimeout / 2}, + {cfg.NominalDialTimeout, 40, 80, cfg.NominalDialTimeout / 3}, + {cfg.NominalDialTimeout, 40, 4000, cfg.NominalDialTimeout / 101}, } { - reduced := reducedDialTimeout(_case.Max, _case.HalfOpenLimit, _case.PendingPeers) + reduced := reducedDialTimeout(cfg.MinDialTimeout, _case.Max, _case.HalfOpenLimit, _case.PendingPeers) expected := _case.ExpectedReduced - if expected < minDialTimeout { - expected = minDialTimeout + if expected < cfg.MinDialTimeout { + expected = cfg.MinDialTimeout } if reduced != expected { t.Fatalf("expected %s, got %s", _case.ExpectedReduced, reduced) diff --git a/cmd/tracker-announce/main.go b/cmd/tracker-announce/main.go index 1e2f86a8f674bf3ec40c482cb8ee61cb7ee98c49..9c99a3b1a0a3f441f25e7ead23d127e4993aa60a 100644 --- a/cmd/tracker-announce/main.go +++ b/cmd/tracker-announce/main.go @@ -39,7 +39,7 @@ } ar.InfoHash = ts.InfoHash for _, tier := range ts.Trackers { for _, tURI := range tier { - resp, err := tracker.Announce(tURI, &ar) + resp, err := tracker.Announce(torrent.DefaultHTTPClient, tURI, &ar) if err != nil { log.Print(err) continue diff --git a/config.go b/config.go index e6b2cc4c4e2b65c9d3501e6e96419bc4e76726bf..aec08d7cafddf551b2f824aa64065711b180e4fb 100644 --- a/config.go +++ b/config.go @@ -6,7 +6,23 @@ "golang.org/x/time/rate" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/storage" + + "crypto/tls" + "net" + "net/http" + "time" ) + +var DefaultHTTPClient = &http.Client{ + Timeout: time.Second * 15, + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: 15 * time.Second, + }).Dial, + TLSHandshakeTimeout: 15 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, +} // Override Client defaults. type Config struct { @@ -59,6 +75,62 @@ IPBlocklist iplist.Ranger DisableIPv6 bool `long:"disable-ipv6"` // Perform logging and any other behaviour that will help debug. Debug bool `help:"enable debugging"` + + // HTTP client used to query the tracker endpoint. Default is DefaultHTTPClient + HTTP *http.Client + // Updated occasionally to when there's been some changes to client + // behaviour in case other clients are assuming anything of us. See also + // `bep20`. + ExtendedHandshakeClientVersion string // default "go.torrent dev 20150624" + // Peer ID client identifier prefix. We'll update this occasionally to + // reflect changes to client behaviour that other clients may depend on. + // Also see `extendedHandshakeClientVersion`. + Bep20 string // default "-GT0001-" + + NominalDialTimeout time.Duration // default time.Second * 30 + MinDialTimeout time.Duration // default 5 * time.Second + EstablishedConnsPerTorrent int // default 80 + HalfOpenConnsPerTorrent int // default 80 + TorrentPeersHighWater int // default 200 + TorrentPeersLowWater int // default 50 + + // Limit how long handshake can take. This is to reduce the lingering + // impact of a few bad apples. 4s loses 1% of successful handshakes that + // are obtained with 60s timeout, and 5% of unsuccessful handshakes. + HandshakesTimeout time.Duration // default 20 * time.Second +} + +func (cfg *Config) setDefaults() { + if cfg.HTTP == nil { + cfg.HTTP = DefaultHTTPClient + } + if cfg.ExtendedHandshakeClientVersion == "" { + cfg.ExtendedHandshakeClientVersion = "go.torrent dev 20150624" + } + if cfg.Bep20 == "" { + cfg.Bep20 = "-GT0001-" + } + if cfg.NominalDialTimeout == 0 { + cfg.NominalDialTimeout = 30 * time.Second + } + if cfg.MinDialTimeout == 0 { + cfg.MinDialTimeout = 5 * time.Second + } + if cfg.EstablishedConnsPerTorrent == 0 { + cfg.EstablishedConnsPerTorrent = 80 + } + if cfg.HalfOpenConnsPerTorrent == 0 { + cfg.HalfOpenConnsPerTorrent = 80 + } + if cfg.TorrentPeersHighWater == 0 { + cfg.TorrentPeersHighWater = 200 + } + if cfg.TorrentPeersLowWater == 0 { + cfg.TorrentPeersLowWater = 50 + } + if cfg.HandshakesTimeout == 0 { + cfg.HandshakesTimeout = 20 * time.Second + } } type EncryptionPolicy struct { diff --git a/global.go b/global.go index ed06fccc223de234cee03898ae4abe319333d537..ffc30f493348c455c360187964c935d66a0899b8 100644 --- a/global.go +++ b/global.go @@ -3,7 +3,6 @@ import ( "crypto" "expvar" - "time" ) const ( @@ -11,18 +10,6 @@ pieceHash = crypto.SHA1 maxRequests = 250 // Maximum pending requests we allow peers to send us. defaultChunkSize = 0x4000 // 16KiB - // Updated occasionally to when there's been some changes to client - // behaviour in case other clients are assuming anything of us. See also - // `bep20`. - extendedHandshakeClientVersion = "go.torrent dev 20150624" - // Peer ID client identifier prefix. We'll update this occasionally to - // reflect changes to client behaviour that other clients may depend on. - // Also see `extendedHandshakeClientVersion`. - bep20 = "-GT0001-" - - nominalDialTimeout = time.Second * 30 - minDialTimeout = 5 * time.Second - // Justification for set bits follows. // // Extension protocol ([5]|=0x10): @@ -35,16 +22,6 @@ // // DHT ([7]|=1): // http://www.bittorrent.org/beps/bep_0005.html defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01" - - defaultEstablishedConnsPerTorrent = 80 - defaultHalfOpenConnsPerTorrent = 80 - torrentPeersHighWater = 200 - torrentPeersLowWater = 50 - - // Limit how long handshake can take. This is to reduce the lingering - // impact of a few bad apples. 4s loses 1% of successful handshakes that - // are obtained with 60s timeout, and 5% of unsuccessful handshakes. - handshakesTimeout = 20 * time.Second // These are our extended message IDs. Peers will use these values to // select which extension a message is intended for. diff --git a/torrent.go b/torrent.go index e0619bfdb483abbfba6a1cd1bacc97d2093564d2..c4abf295f647abea088ed4480eccce1480c62904 100644 --- a/torrent.go +++ b/torrent.go @@ -225,7 +225,7 @@ func (t *Torrent) addPeer(p Peer) { cl := t.cl cl.openNewConns(t) - if len(t.peers) >= torrentPeersHighWater { + if len(t.peers) >= cl.config.TorrentPeersHighWater { return } key := peersKey{string(p.IP), p.Port} @@ -1160,7 +1160,7 @@ func (t *Torrent) wantPeers() bool { if t.closed.IsSet() { return false } - if len(t.peers) > torrentPeersLowWater { + if len(t.peers) > t.cl.config.TorrentPeersLowWater { return false } return t.needData() || t.seeding() @@ -1270,7 +1270,7 @@ cl.mu.Lock() t.addPeers(addPeers) numPeers := len(t.peers) cl.mu.Unlock() - if numPeers >= torrentPeersHighWater { + if numPeers >= cl.config.TorrentPeersHighWater { return } case <-t.closed.LockedChan(&cl.mu): diff --git a/tracker/http.go b/tracker/http.go index 4ca80f8fa81cd599fc4dcaa222ed46a52f2682a5..b0ab3f95bee9f7985842992da703456d05aaaa64 100644 --- a/tracker/http.go +++ b/tracker/http.go @@ -2,15 +2,12 @@ package tracker import ( "bytes" - "crypto/tls" "errors" "fmt" "io" - "net" "net/http" "net/url" "strconv" - "time" "github.com/anacrolix/missinggo/httptoo" @@ -25,17 +22,6 @@ TrackerId string `bencode:"tracker id"` Complete int32 `bencode:"complete"` Incomplete int32 `bencode:"incomplete"` Peers interface{} `bencode:"peers"` -} - -var netClient = &http.Client{ - Timeout: time.Second * 15, - Transport: &http.Transport{ - Dial: (&net.Dialer{ - Timeout: 15 * time.Second, - }).Dial, - TLSHandshakeTimeout: 15 * time.Second, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, } func (r *httpResponse) UnmarshalPeers() (ret []Peer, err error) { @@ -87,12 +73,12 @@ _url.RawQuery = q.Encode() } -func announceHTTP(ar *AnnounceRequest, _url *url.URL, host string) (ret AnnounceResponse, err error) { +func announceHTTP(cl *http.Client, ar *AnnounceRequest, _url *url.URL, host string) (ret AnnounceResponse, err error) { _url = httptoo.CopyURL(_url) setAnnounceParams(_url, ar) req, err := http.NewRequest("GET", _url.String(), nil) req.Host = host - resp, err := netClient.Do(req) + resp, err := cl.Do(req) if err != nil { return } diff --git a/tracker/tracker.go b/tracker/tracker.go index 9f14041e72e236c5eb8af1cfbd2c2992f65d7f05..4ed76f11f4115ccb39965171fc2aecf33c7b2740 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -3,6 +3,7 @@ import ( "errors" "net" + "net/http" "net/url" ) @@ -53,18 +54,18 @@ var ( ErrBadScheme = errors.New("unknown scheme") ) -func Announce(urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) { - return AnnounceHost(urlStr, req, "") +func Announce(cl *http.Client, urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) { + return AnnounceHost(cl, urlStr, req, "") } -func AnnounceHost(urlStr string, req *AnnounceRequest, host string) (res AnnounceResponse, err error) { +func AnnounceHost(cl *http.Client, urlStr string, req *AnnounceRequest, host string) (res AnnounceResponse, err error) { _url, err := url.Parse(urlStr) if err != nil { return } switch _url.Scheme { case "http", "https": - return announceHTTP(req, _url, host) + return announceHTTP(cl, req, _url, host) case "udp": return announceUDP(req, _url) default: diff --git a/tracker/tracker_test.go b/tracker/tracker_test.go index d095af5a73b5215c242aadbd0bcea81557f1a36b..bf397e9d0af90231594a637b7be08a1e94b9c5a5 100644 --- a/tracker/tracker_test.go +++ b/tracker/tracker_test.go @@ -1,12 +1,27 @@ package tracker import ( + "crypto/tls" + "net" + "net/http" "testing" + "time" ) +var defaultClient = &http.Client{ + Timeout: time.Second * 15, + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: 15 * time.Second, + }).Dial, + TLSHandshakeTimeout: 15 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, +} + func TestUnsupportedTrackerScheme(t *testing.T) { t.Parallel() - _, err := Announce("lol://tracker.openbittorrent.com:80/announce", nil) + _, err := Announce(defaultClient, "lol://tracker.openbittorrent.com:80/announce", nil) if err != ErrBadScheme { t.Fatal(err) } diff --git a/tracker/udp_test.go b/tracker/udp_test.go index 55946aae220dc1fe2cb7408f339c3e7db74a87a6..78434d328dd73e40d19ffbf4f54242920f3b3da7 100644 --- a/tracker/udp_test.go +++ b/tracker/udp_test.go @@ -114,7 +114,7 @@ copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}) go func() { require.NoError(t, srv.serveOne()) }() - ar, err := Announce(fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()), &req) + ar, err := Announce(defaultClient, fmt.Sprintf("udp://%s/announce", srv.pc.LocalAddr().String()), &req) require.NoError(t, err) assert.EqualValues(t, 1, ar.Seeders) assert.EqualValues(t, 2, len(ar.Peers)) @@ -130,7 +130,7 @@ NumWant: -1, } rand.Read(req.PeerId[:]) copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}) - ar, err := Announce("udp://tracker.openbittorrent.com:80/announce", &req) + ar, err := Announce(defaultClient, "udp://tracker.openbittorrent.com:80/announce", &req) // Skip any net errors as we don't control the server. if _, ok := err.(net.Error); ok { t.Skip(err) @@ -166,7 +166,7 @@ } { wg.Add(1) go func(url string) { defer wg.Done() - resp, err := Announce(url, &req) + resp, err := Announce(defaultClient, url, &req) if err != nil { t.Logf("error announcing to %s: %s", url, err) return @@ -202,7 +202,7 @@ panic(err) } defer conn.Close() go func() { - _, err := Announce((&url.URL{ + _, err := Announce(defaultClient, (&url.URL{ Scheme: "udp", Host: conn.LocalAddr().String(), Path: "/announce", diff --git a/tracker_scraper.go b/tracker_scraper.go index 3463133c931ea6d4982bf1aeb15f5629eeb8119d..a090bc1d7603a7b57b1315d76a5135404a01f84b 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -85,7 +85,7 @@ } me.t.cl.mu.Lock() req := me.t.announceRequest() me.t.cl.mu.Unlock() - res, err := tracker.AnnounceHost(urlToUse, &req, host) + res, err := tracker.AnnounceHost(me.t.cl.config.HTTP, urlToUse, &req, host) if err != nil { ret.Err = err return