"crypto/rand"
"encoding/binary"
"errors"
- "expvar"
"fmt"
"io"
"net"
peerID PeerID
defaultStorage *storage.Client
onClose []func()
- tcpListener net.Listener
- utpSock utpSocket
- dHT *dht.Server
+ conns []socket
+ dhtServers []*dht.Server
ipBlockList iplist.Ranger
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
extensionBytes peerExtensionBytes
- // The net.Addr.String part that should be common to all active listeners.
- listenAddr string
- uploadLimit *rate.Limiter
- downloadLimit *rate.Limiter
+ uploadLimit *rate.Limiter
+ downloadLimit *rate.Limiter
// Set of addresses that have our client ID. This intentionally will
// include ourselves if we end up trying to connect to our own address
return slices.FromMapKeys(cl.badPeerIPs).([]string)
}
-func (cl *Client) IPBlockList() iplist.Ranger {
- cl.mu.Lock()
- defer cl.mu.Unlock()
- return cl.ipBlockList
-}
-
-func (cl *Client) SetIPBlockList(list iplist.Ranger) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
- cl.ipBlockList = list
- if cl.dHT != nil {
- cl.dHT.SetIPBlockList(list)
- }
-}
-
func (cl *Client) PeerID() PeerID {
return cl.peerID
}
func (me torrentAddr) String() string { return string(me) }
-func (cl *Client) ListenAddr() net.Addr {
- if cl.listenAddr == "" {
- return nil
- }
- return torrentAddr(cl.listenAddr)
+func (cl *Client) LocalPort() (port int) {
+ cl.eachListener(func(l socket) bool {
+ _port := missinggo.AddrPort(l.Addr())
+ if _port == 0 {
+ panic(l)
+ }
+ if port == 0 {
+ port = _port
+ } else if port != _port {
+ panic("mismatched ports")
+ }
+ return true
+ })
+ return
+}
+
+func writeDhtServerStatus(w io.Writer, s *dht.Server) {
+ dhtStats := s.Stats()
+ fmt.Fprintf(w, "\tDHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
+ fmt.Fprintf(w, "\tDHT Server ID: %x\n", s.ID())
+ fmt.Fprintf(w, "\tDHT port: %d\n", missinggo.AddrPort(s.Addr()))
+ fmt.Fprintf(w, "\tDHT announces: %d\n", dhtStats.ConfirmedAnnounces)
+ fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
}
// Writes out a human readable status of the client, such as for writing to a
defer cl.mu.Unlock()
w := bufio.NewWriter(_w)
defer w.Flush()
- if addr := cl.ListenAddr(); addr != nil {
- fmt.Fprintf(w, "Listening on %s\n", addr)
- } else {
- fmt.Fprintln(w, "Not listening!")
- }
+ fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
- if dht := cl.DHT(); dht != nil {
- dhtStats := dht.Stats()
- fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
- fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
- fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
- fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
- fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
- }
+ cl.eachDhtServer(func(s *dht.Server) {
+ fmt.Fprintf(w, "%s DHT server:\n", s.Addr().Network())
+ writeDhtServerStatus(w, s)
+ })
fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
fmt.Fprintln(w)
for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
}
}
-func listenUTP(networkSuffix, addr string) (utpSocket, error) {
- return NewUtpSocket("udp"+networkSuffix, addr)
-}
-
-func listenTCP(networkSuffix, addr string) (net.Listener, error) {
- return net.Listen("tcp"+networkSuffix, addr)
-}
-
-func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
- for {
- tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
- if err != nil {
- return
- }
- listenedAddr = tcpL.Addr().String()
- utpSock, err = listenUTP(networkSuffix, listenedAddr)
- if err == nil {
- return
- }
- tcpL.Close()
- if !strings.Contains(err.Error(), "address already in use") {
- return
- }
- }
-}
-
-// Listen to enabled protocols, ensuring ports match.
-func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
- if addr == "" {
- addr = ":50007"
- }
- if tcp && utp {
- var host string
- var port int
- host, port, err = missinggo.ParseHostPort(addr)
- if err != nil {
- return
- }
- if port == 0 {
- // If both protocols are active, they need to have the same port.
- return listenBothSameDynamicPort(networkSuffix, host)
- }
- }
- defer func() {
- if err != nil {
- listenedAddr = ""
- }
- }()
- if tcp {
- tcpL, err = listenTCP(networkSuffix, addr)
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- tcpL.Close()
- }
- }()
- listenedAddr = tcpL.Addr().String()
- }
- if utp {
- utpSock, err = listenUTP(networkSuffix, addr)
- if err != nil {
- return
- }
- listenedAddr = utpSock.Addr().String()
- }
- return
-}
-
const debugLogValue = "debug"
func (cl *Client) debugLogFilter(m *log.Msg) bool {
return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
}
-// Creates a new client.
func NewClient(cfg *Config) (cl *Client, err error) {
- if cfg == nil {
- cfg = &Config{
- DHTConfig: dht.ServerConfig{
- StartingNodes: dht.GlobalBootstrapAddrs,
- },
- }
- }
if cfg == nil {
cfg = &Config{}
}
}
}
- cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
- !cl.config.DisableTCP,
- !cl.config.DisableUTP,
- // We'll listen to IPv4 for TCP even if IPv4 peer connections are
- // disabled because we want to ensure peers don't connect to some
- // other process on that port.
- ipNetworkSuffix(!cl.config.DisableIPv4, !cl.config.DisableIPv6),
- cl.config.ListenAddr)
+ cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenAddr)
if err != nil {
return
}
- go cl.forwardPort()
- if cl.tcpListener != nil {
- go cl.acceptConnections(cl.tcpListener, false)
- }
- if cl.utpSock != nil {
- go cl.acceptConnections(cl.utpSock, true)
+ cl.LocalPort()
+
+ for _, s := range cl.conns {
+ if peerNetworkEnabled(s.Addr().Network(), cl.config) {
+ go cl.acceptConnections(s)
+ }
}
+
+ go cl.forwardPort()
if !cfg.NoDHT {
- dhtCfg := cfg.DHTConfig
- if dhtCfg.IPBlocklist == nil {
- dhtCfg.IPBlocklist = cl.ipBlockList
- }
- if dhtCfg.Conn == nil {
- if cl.utpSock != nil {
- dhtCfg.Conn = cl.utpSock
- } else {
- dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
+ for _, s := range cl.conns {
+ if pc, ok := s.(net.PacketConn); ok {
+ ds, err := cl.newDhtServer(pc)
if err != nil {
- return
+ panic(err)
}
+ cl.dhtServers = append(cl.dhtServers, ds)
}
}
- if dhtCfg.OnAnnouncePeer == nil {
- dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
- }
- cl.dHT, err = dht.NewServer(&dhtCfg)
- if err != nil {
- return
+ }
+
+ return
+}
+
+func (cl *Client) enabledPeerNetworks() (ns []string) {
+ for _, n := range allPeerNetworks {
+ if peerNetworkEnabled(n, cl.config) {
+ ns = append(ns, n)
}
+ }
+ return
+}
+
+func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
+ cfg := dht.ServerConfig{
+ IPBlocklist: cl.ipBlockList,
+ Conn: conn,
+ OnAnnouncePeer: cl.onDHTAnnouncePeer,
+ PublicIP: func() net.IP {
+ if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
+ return cl.config.PublicIp6
+ }
+ return cl.config.PublicIp4
+ }(),
+ StartingNodes: cl.config.DhtStartingNodes,
+ }
+ s, err = dht.NewServer(&cfg)
+ if err == nil {
go func() {
- if _, err := cl.dHT.Bootstrap(); err != nil {
+ if _, err := s.Bootstrap(); err != nil {
log.Printf("error bootstrapping dht: %s", err)
}
}()
}
-
return
}
return cl.closed.C()
}
+func (cl *Client) eachDhtServer(f func(*dht.Server)) {
+ for _, ds := range cl.dhtServers {
+ f(ds)
+ }
+}
+
+func (cl *Client) closeSockets() {
+ cl.eachListener(func(l socket) bool {
+ l.Close()
+ return true
+ })
+ cl.conns = nil
+}
+
// Stops the client. All connections to peers are closed and all activity will
// come to a halt.
func (cl *Client) Close() {
cl.mu.Lock()
defer cl.mu.Unlock()
cl.closed.Set()
- if cl.dHT != nil {
- cl.dHT.Close()
- }
- if cl.utpSock != nil {
- cl.utpSock.Close()
- }
- if cl.tcpListener != nil {
- cl.tcpListener.Close()
- }
+ cl.eachDhtServer(func(s *dht.Server) { s.Close() })
+ cl.closeSockets()
for _, t := range cl.torrents {
t.close()
}
return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
}
-func (cl *Client) acceptConnections(l net.Listener, utp bool) {
+func (cl *Client) acceptConnections(l net.Listener) {
cl.mu.Lock()
defer cl.mu.Unlock()
for {
log.Fmsg("accepted connection from %s", conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
go torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
go torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
- if utp {
- go torrent.Add("accepted utp connections", 1)
- } else {
- go torrent.Add("accepted tcp connections", 1)
- }
+ go torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
if cl.rejectAccepted(conn) {
go torrent.Add("rejected accepted connections", 1)
conn.Close()
} else {
- go cl.incomingConnection(conn, utp)
+ go cl.incomingConnection(conn)
}
}
}
-func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
+func (cl *Client) incomingConnection(nc net.Conn) {
defer nc.Close()
if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
c := cl.newConnection(nc)
c.Discovery = peerSourceIncoming
- c.uTP = utp
cl.runReceivedConn(c)
}
type dialResult struct {
Conn net.Conn
- UTP bool
}
func countDialResult(err error) {
return
}
-func (cl *Client) utpDialNetwork() string {
- // We want to restrict the addr resolve inside the utp library to the
- // correct network, since the utp Socket may be listening to a broader
- // network for DHT purposes or otherwise.
- if !cl.config.DisableIPv4Peers {
- return ""
- }
- n := cl.utpSock.Addr().Network()
- switch n {
- case "udp", "udp4", "udp6":
- return "udp6"
- default:
- panic(n)
- }
-}
-
func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
switch {
case allowIpv4 && allowIpv6:
}
}
-func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
- c, err = cl.utpSock.DialContext(ctx, cl.utpDialNetwork(), addr)
- countDialResult(err)
- return
+func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
+ return sock.DialContext(ctx, "", addr)
}
-var (
- dialledFirstUtp = expvar.NewInt("dialledFirstUtp")
- dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
-)
+var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
+
+func peerNetworkEnabled(network string, cfg Config) bool {
+ c := func(s string) bool {
+ return strings.Contains(network, s)
+ }
+ if cfg.DisableUTP {
+ if c("udp") || c("utp") {
+ return false
+ }
+ }
+ if cfg.DisableTCP && c("tcp") {
+ return false
+ }
+ return true
+}
// Returns a connection over UTP or TCP, whichever is first to connect.
-func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
+func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
ctx, cancel := context.WithCancel(ctx)
// As soon as we return one connection, cancel the others.
defer cancel()
left := 0
resCh := make(chan dialResult, left)
- if !cl.config.DisableUTP {
- left++
- go func() {
- c, _ := cl.dialUTP(ctx, addr)
- resCh <- dialResult{c, true}
- }()
- }
- if !cl.config.DisableTCP {
+ dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
left++
go func() {
- c, _ := cl.dialTCP(ctx, addr)
- resCh <- dialResult{c, false}
+ c, err := f(ctx, addr)
+ countDialResult(err)
+ resCh <- dialResult{c}
}()
}
+ func() {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ cl.eachListener(func(s socket) bool {
+ if peerNetworkEnabled(s.Addr().Network(), cl.config) {
+ dial(s.dial)
+ }
+ return true
+ })
+ }()
var res dialResult
// Wait for a successful connection.
for ; left > 0 && res.Conn == nil; left-- {
res = <-resCh
}
- if left > 0 {
- // There are still incompleted dials.
- go func() {
- for ; left > 0; left-- {
- conn := (<-resCh).Conn
- if conn != nil {
- conn.Close()
- }
+ // There are still incompleted dials.
+ go func() {
+ for ; left > 0; left-- {
+ conn := (<-resCh).Conn
+ if conn != nil {
+ conn.Close()
}
- }()
- }
- conn = res.Conn
- utp = res.UTP
- if conn != nil {
- if utp {
- dialledFirstUtp.Add(1)
- } else {
- dialledFirstNotUtp.Add(1)
}
+ }()
+ if res.Conn != nil {
+ go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
}
- return
+ return res.Conn
}
func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
// Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons.
-func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
+func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
c = cl.newConnection(nc)
c.headerEncrypted = encryptHeader
- c.uTP = utp
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
defer cancel()
dl, ok := ctx.Deadline()
return
}
-var (
- initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
- initiatedConnWithFallbackHeaderEncryption = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
-)
+// Returns nil connection and nil error if no connection could be established
+// for valid reasons.
+func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
+ nc := cl.dialFirst(ctx, addr)
+ if nc == nil {
+ return
+ }
+ defer func() {
+ if c == nil || err != nil {
+ nc.Close()
+ }
+ }()
+ return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
+}
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
- nc, utp := cl.dialFirst(ctx, addr)
- if nc == nil {
- return
- }
obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
- c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
+ c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
if err != nil {
- // log.Printf("error initiating connection handshakes: %s", err)
- nc.Close()
return
- } else if c != nil {
- initiatedConnWithPreferredHeaderEncryption.Add(1)
+ }
+ if c != nil {
+ go torrent.Add("initiated conn with preferred header obfuscation", 1)
return
}
- nc.Close()
if cl.config.ForceEncryption {
// We should have just tried with an obfuscated header. A plaintext
// header can't result in an encrypted connection, so we're done.
}
return
}
- // Try again with encryption if we didn't earlier, or without if we did,
- // using whichever protocol type worked last time.
- if utp {
- nc, err = cl.dialUTP(ctx, addr)
- } else {
- nc, err = cl.dialTCP(ctx, addr)
- }
- if err != nil {
- err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
- return
- }
- c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
- if err != nil || c == nil {
- nc.Close()
- }
- if err == nil && c != nil {
- initiatedConnWithFallbackHeaderEncryption.Add(1)
+ // Try again with encryption if we didn't earlier, or without if we did.
+ c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
+ if c != nil {
+ go torrent.Add("initiated conn with fallback header obfuscation", 1)
}
return
}
// The port number for incoming peer connections. 0 if the client isn't
// listening.
func (cl *Client) incomingPeerPort() int {
- if cl.listenAddr == "" {
- return 0
- }
- _, port, err := missinggo.ParseHostPort(cl.listenAddr)
- if err != nil {
- panic(err)
- }
- return port
+ return cl.LocalPort()
}
func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
}(),
})
}
- if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
+ if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
conn.Post(pp.Message{
Type: pp.Port,
- Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
+ Port: cl.dhtPort(),
})
}
}
+func (cl *Client) dhtPort() (ret uint16) {
+ cl.eachDhtServer(func(s *dht.Server) {
+ ret = uint16(missinggo.AddrPort(s.Addr()))
+ })
+ return
+}
+
+func (cl *Client) haveDhtServer() (ret bool) {
+ cl.eachDhtServer(func(_ *dht.Server) {
+ ret = true
+ })
+ return
+}
+
// Process incoming ut_metadata message.
func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
var d map[string]int
}
new = true
t = cl.newTorrent(infoHash, specStorage)
- if cl.dHT != nil {
- go t.dhtAnnouncer()
- }
+ cl.eachDhtServer(func(s *dht.Server) {
+ go t.dhtAnnouncer(s)
+ })
cl.torrents[infoHash] = t
t.updateWantPeersEvent()
// Tickle Client.waitAccept, new torrent may want conns.
return cl.AddTorrent(mi)
}
-func (cl *Client) DHT() *dht.Server {
- return cl.dHT
+func (cl *Client) DhtServers() []*dht.Server {
+ return cl.dhtServers
}
func (cl *Client) AddDHTNodes(nodes []string) {
- if cl.DHT() == nil {
- return
- }
for _, n := range nodes {
hmp := missinggo.SplitHostMaybePort(n)
ip := net.ParseIP(hmp.Host)
Port: hmp.Port,
},
}
- cl.DHT().AddNode(ni)
+ cl.eachDhtServer(func(s *dht.Server) {
+ s.AddNode(ni)
+ })
}
}
return nil
}
+func (cl *Client) eachListener(f func(socket) bool) {
+ for _, s := range cl.conns {
+ if !f(s) {
+ break
+ }
+ }
+}
+
+func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
+ cl.eachListener(func(l socket) bool {
+ ret = l
+ return !f(l)
+ })
+ return
+}
+
func (cl *Client) publicIp(peer net.IP) net.IP {
// TODO: Use BEP 10 to determine how peers are seeing us.
if peer.To4() != nil {
- return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4())
+ return firstNotNil(
+ cl.config.PublicIp4,
+ cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
+ )
} else {
- return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16())
+ return firstNotNil(
+ cl.config.PublicIp6,
+ cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
+ )
}
}
+func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
+ return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
+ return f(missinggo.AddrIP(l.Addr()))
+ }).Addr())
+}
+
// Our IP as a peer should see it.
func (cl *Client) publicAddr(peer net.IP) ipPort {
return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
}
+
+func (cl *Client) ListenAddrs() (ret []net.Addr) {
+ cl.eachListener(func(l socket) bool {
+ ret = append(ret, l.Addr())
+ return true
+ })
+ return
+}
"testing"
"time"
+ "github.com/anacrolix/dht"
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/filecache"
}
}
-func TestTwoClientsArbitraryPorts(t *testing.T) {
- for i := 0; i < 2; i++ {
- cl, err := NewClient(TestingConfig())
- if err != nil {
- t.Fatal(err)
- }
- defer cl.Close()
- }
-}
-
func TestAddDropManyTorrents(t *testing.T) {
cl, err := NewClient(TestingConfig())
require.NoError(t, err)
require.NoError(t, err)
assert.True(t, new)
// Now do some things with leecher and seeder.
- addClientPeer(leecherTorrent, seeder)
+ leecherTorrent.AddClientPeer(seeder)
// The Torrent should not be interested in obtaining peers, so the one we
// just added should be the only one.
assert.False(t, leecherTorrent.Seeding())
require.NoError(t, err)
defer os.RemoveAll(cfg.DataDir)
leecherLeecher, _ := NewClient(cfg)
+ require.NoError(t, err)
defer leecherLeecher.Close()
testutil.ExportStatusWriter(leecherLeecher, "ll")
leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
require.NoError(t, err)
assert.EqualValues(t, testutil.GreetingFileContents, b)
}()
- addClientPeer(leecherGreeting, seeder)
- addClientPeer(leecherGreeting, leecherLeecher)
+ leecherGreeting.AddClientPeer(seeder)
+ leecherGreeting.AddClientPeer(leecherLeecher)
wg.Add(1)
go func() {
defer wg.Done()
ret.ChunkSize = 2
return
}())
- addClientPeer(leecherTorrent, seeder)
+ leecherTorrent.AddClientPeer(seeder)
reader := leecherTorrent.NewReader()
defer reader.Close()
reader.SetReadahead(0)
ret.ChunkSize = 2
return
}())
- addClientPeer(leecherTorrent, seeder)
+ leecherTorrent.AddClientPeer(seeder)
reader := leecherTorrent.NewReader()
defer reader.Close()
reader.SetReadahead(0)
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
- require.Equal(t, ipl, cl.DHT().IPBlocklist())
+ numServers := 0
+ cl.eachDhtServer(func(s *dht.Server) {
+ assert.Equal(t, ipl, s.IPBlocklist())
+ numServers++
+ })
+ assert.EqualValues(t, 2, numServers)
}
// Check that stuff is merged in subsequent AddTorrentSpec for the same
cfg := TestingConfig()
cfg.ListenAddr = ":0"
cfg.NoDHT = false
+ cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
// For now, we want to just jam the nodes into the table, without
// verifying them first. Also the DHT code doesn't support mixing secure
// and insecure nodes if security is enabled (yet).
- cfg.DHTConfig.NoSecurity = true
+ // cfg.DHTConfig.NoSecurity = true
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
- assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
+ sum := func() (ret int) {
+ cl.eachDhtServer(func(s *dht.Server) {
+ ret += s.NumNodes()
+ ret += s.Stats().OutstandingTransactions
+ })
+ return
+ }
+ assert.EqualValues(t, 0, sum())
tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
require.NoError(t, err)
// Nodes are not added or exposed in Torrent's metainfo. We just randomly
// check if the announce-list is here instead. TODO: Add nodes.
assert.Len(t, tt.metainfo.AnnounceList, 5)
// There are 6 nodes in the torrent file.
- assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
+ assert.EqualValues(t, 6*len(cl.dhtServers), sum())
}
type testDownloadCancelParams struct {
}
leecherGreeting.cl.mu.Unlock()
- addClientPeer(leecherGreeting, seeder)
+ leecherGreeting.AddClientPeer(seeder)
completes := make(map[int]bool, 3)
values:
for {
cl, err := NewClient(TestingConfig())
require.NoError(t, err)
defer cl.Close()
- assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
- assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
+ port := cl.LocalPort()
+ assert.NotEqual(t, 0, port)
+ cl.eachListener(func(s socket) bool {
+ assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
+ return true
+ })
}
func TestClientDynamicListenTCPOnly(t *testing.T) {
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
- assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
- assert.Nil(t, cl.utpSock)
+ assert.NotEqual(t, 0, cl.LocalPort())
+ cl.eachListener(func(s socket) bool {
+ assert.True(t, isTcpNetwork(s.Addr().Network()))
+ return true
+ })
}
func TestClientDynamicListenUTPOnly(t *testing.T) {
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
- assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
- assert.Nil(t, cl.tcpListener)
+ assert.NotEqual(t, 0, cl.LocalPort())
+ cl.eachListener(func(s socket) bool {
+ assert.True(t, isUtpNetwork(s.Addr().Network()))
+ return true
+ })
}
func TestClientDynamicListenPortNoProtocols(t *testing.T) {
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
- assert.Nil(t, cl.ListenAddr())
-}
-
-func addClientPeer(t *Torrent, cl *Client) {
- t.AddPeers([]Peer{
- {
- IP: missinggo.AddrIP(cl.ListenAddr()),
- Port: missinggo.AddrPort(cl.ListenAddr()),
- },
- })
+ assert.Equal(t, 0, cl.LocalPort())
}
func totalConns(tts []*Torrent) (ret int) {
for _, tt := range tts {
for _, _tt := range tts {
// if tt != _tt {
- addClientPeer(tt, _tt.cl)
+ tt.AddClientPeer(_tt.cl)
// }
}
}
testutil.ExportStatusWriter(client, "c")
tr, err := client.AddMagnet(magnet1)
require.NoError(t, err)
- tr.AddPeers([]Peer{{
- IP: missinggo.AddrIP(server.ListenAddr()),
- Port: missinggo.AddrPort(server.ListenAddr()),
- }})
+ tr.AddClientPeer(server)
<-tr.GotInfo()
tr.DownloadAll()
client.WaitAll()