From: Matt Joiner Date: Thu, 12 Apr 2018 01:41:07 +0000 (+1000) Subject: Rework conns to/and allow multiple DHT servers X-Git-Tag: v1.0.0~152 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=319e57d1c6e5ddd95b8dcd5439baf13828e16741;p=btrtrc.git Rework conns to/and allow multiple DHT servers This will help with #229, and IPv6 support. --- diff --git a/bencode/decode.go b/bencode/decode.go index 51a7a2d5..dc985aaf 100644 --- a/bencode/decode.go +++ b/bencode/decode.go @@ -226,9 +226,9 @@ func getDictField(dict reflect.Value, key string) dictField { }) } return dictField{ - Value: dict.FieldByIndex(sf.Index), - Ok: true, - Set: func() {}, + Value: dict.FieldByIndex(sf.Index), + Ok: true, + Set: func() {}, IgnoreUnmarshalTypeError: getTag(sf.Tag).IgnoreUnmarshalTypeError(), } default: diff --git a/client.go b/client.go index d932cd7f..2b984b57 100644 --- a/client.go +++ b/client.go @@ -7,7 +7,6 @@ import ( "crypto/rand" "encoding/binary" "errors" - "expvar" "fmt" "io" "net" @@ -49,16 +48,13 @@ type Client struct { peerID PeerID defaultStorage *storage.Client onClose []func() - tcpListener net.Listener - utpSock utpSocket - dHT *dht.Server + conns []socket + dhtServers []*dht.Server ipBlockList iplist.Ranger // Our BitTorrent protocol extension bytes, sent in our BT handshakes. extensionBytes peerExtensionBytes - // The net.Addr.String part that should be common to all active listeners. - listenAddr string - uploadLimit *rate.Limiter - downloadLimit *rate.Limiter + uploadLimit *rate.Limiter + downloadLimit *rate.Limiter // Set of addresses that have our client ID. This intentionally will // include ourselves if we end up trying to connect to our own address @@ -78,21 +74,6 @@ func (cl *Client) badPeerIPsLocked() []string { return slices.FromMapKeys(cl.badPeerIPs).([]string) } -func (cl *Client) IPBlockList() iplist.Ranger { - cl.mu.Lock() - defer cl.mu.Unlock() - return cl.ipBlockList -} - -func (cl *Client) SetIPBlockList(list iplist.Ranger) { - cl.mu.Lock() - defer cl.mu.Unlock() - cl.ipBlockList = list - if cl.dHT != nil { - cl.dHT.SetIPBlockList(list) - } -} - func (cl *Client) PeerID() PeerID { return cl.peerID } @@ -103,11 +84,29 @@ func (torrentAddr) Network() string { return "" } func (me torrentAddr) String() string { return string(me) } -func (cl *Client) ListenAddr() net.Addr { - if cl.listenAddr == "" { - return nil - } - return torrentAddr(cl.listenAddr) +func (cl *Client) LocalPort() (port int) { + cl.eachListener(func(l socket) bool { + _port := missinggo.AddrPort(l.Addr()) + if _port == 0 { + panic(l) + } + if port == 0 { + port = _port + } else if port != _port { + panic("mismatched ports") + } + return true + }) + return +} + +func writeDhtServerStatus(w io.Writer, s *dht.Server) { + dhtStats := s.Stats() + fmt.Fprintf(w, "\tDHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes) + fmt.Fprintf(w, "\tDHT Server ID: %x\n", s.ID()) + fmt.Fprintf(w, "\tDHT port: %d\n", missinggo.AddrPort(s.Addr())) + fmt.Fprintf(w, "\tDHT announces: %d\n", dhtStats.ConfirmedAnnounces) + fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions) } // Writes out a human readable status of the client, such as for writing to a @@ -117,22 +116,14 @@ func (cl *Client) WriteStatus(_w io.Writer) { defer cl.mu.Unlock() w := bufio.NewWriter(_w) defer w.Flush() - if addr := cl.ListenAddr(); addr != nil { - fmt.Fprintf(w, "Listening on %s\n", addr) - } else { - fmt.Fprintln(w, "Not listening!") - } + fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort()) fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID()) fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey()) fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked())) - if dht := cl.DHT(); dht != nil { - dhtStats := dht.Stats() - fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes) - fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID()) - fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr())) - fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces) - fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions) - } + cl.eachDhtServer(func(s *dht.Server) { + fmt.Fprintf(w, "%s DHT server:\n", s.Addr().Network()) + writeDhtServerStatus(w, s) + }) fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice())) fmt.Fprintln(w) for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool { @@ -155,76 +146,6 @@ func (cl *Client) WriteStatus(_w io.Writer) { } } -func listenUTP(networkSuffix, addr string) (utpSocket, error) { - return NewUtpSocket("udp"+networkSuffix, addr) -} - -func listenTCP(networkSuffix, addr string) (net.Listener, error) { - return net.Listen("tcp"+networkSuffix, addr) -} - -func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) { - for { - tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0")) - if err != nil { - return - } - listenedAddr = tcpL.Addr().String() - utpSock, err = listenUTP(networkSuffix, listenedAddr) - if err == nil { - return - } - tcpL.Close() - if !strings.Contains(err.Error(), "address already in use") { - return - } - } -} - -// Listen to enabled protocols, ensuring ports match. -func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) { - if addr == "" { - addr = ":50007" - } - if tcp && utp { - var host string - var port int - host, port, err = missinggo.ParseHostPort(addr) - if err != nil { - return - } - if port == 0 { - // If both protocols are active, they need to have the same port. - return listenBothSameDynamicPort(networkSuffix, host) - } - } - defer func() { - if err != nil { - listenedAddr = "" - } - }() - if tcp { - tcpL, err = listenTCP(networkSuffix, addr) - if err != nil { - return - } - defer func() { - if err != nil { - tcpL.Close() - } - }() - listenedAddr = tcpL.Addr().String() - } - if utp { - utpSock, err = listenUTP(networkSuffix, addr) - if err != nil { - return - } - listenedAddr = utpSock.Addr().String() - } - return -} - const debugLogValue = "debug" func (cl *Client) debugLogFilter(m *log.Msg) bool { @@ -243,15 +164,7 @@ func (cl *Client) announceKey() int32 { return int32(binary.BigEndian.Uint32(cl.peerID[16:20])) } -// Creates a new client. func NewClient(cfg *Config) (cl *Client, err error) { - if cfg == nil { - cfg = &Config{ - DHTConfig: dht.ServerConfig{ - StartingNodes: dht.GlobalBootstrapAddrs, - }, - } - } if cfg == nil { cfg = &Config{} } @@ -312,53 +225,64 @@ func NewClient(cfg *Config) (cl *Client, err error) { } } - cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen( - !cl.config.DisableTCP, - !cl.config.DisableUTP, - // We'll listen to IPv4 for TCP even if IPv4 peer connections are - // disabled because we want to ensure peers don't connect to some - // other process on that port. - ipNetworkSuffix(!cl.config.DisableIPv4, !cl.config.DisableIPv6), - cl.config.ListenAddr) + cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenAddr) if err != nil { return } - go cl.forwardPort() - if cl.tcpListener != nil { - go cl.acceptConnections(cl.tcpListener, false) - } - if cl.utpSock != nil { - go cl.acceptConnections(cl.utpSock, true) + cl.LocalPort() + + for _, s := range cl.conns { + if peerNetworkEnabled(s.Addr().Network(), cl.config) { + go cl.acceptConnections(s) + } } + + go cl.forwardPort() if !cfg.NoDHT { - dhtCfg := cfg.DHTConfig - if dhtCfg.IPBlocklist == nil { - dhtCfg.IPBlocklist = cl.ipBlockList - } - if dhtCfg.Conn == nil { - if cl.utpSock != nil { - dhtCfg.Conn = cl.utpSock - } else { - dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr)) + for _, s := range cl.conns { + if pc, ok := s.(net.PacketConn); ok { + ds, err := cl.newDhtServer(pc) if err != nil { - return + panic(err) } + cl.dhtServers = append(cl.dhtServers, ds) } } - if dhtCfg.OnAnnouncePeer == nil { - dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer - } - cl.dHT, err = dht.NewServer(&dhtCfg) - if err != nil { - return + } + + return +} + +func (cl *Client) enabledPeerNetworks() (ns []string) { + for _, n := range allPeerNetworks { + if peerNetworkEnabled(n, cl.config) { + ns = append(ns, n) } + } + return +} + +func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) { + cfg := dht.ServerConfig{ + IPBlocklist: cl.ipBlockList, + Conn: conn, + OnAnnouncePeer: cl.onDHTAnnouncePeer, + PublicIP: func() net.IP { + if connIsIpv6(conn) && cl.config.PublicIp6 != nil { + return cl.config.PublicIp6 + } + return cl.config.PublicIp4 + }(), + StartingNodes: cl.config.DhtStartingNodes, + } + s, err = dht.NewServer(&cfg) + if err == nil { go func() { - if _, err := cl.dHT.Bootstrap(); err != nil { + if _, err := s.Bootstrap(); err != nil { log.Printf("error bootstrapping dht: %s", err) } }() } - return } @@ -377,21 +301,28 @@ func (cl *Client) Closed() <-chan struct{} { return cl.closed.C() } +func (cl *Client) eachDhtServer(f func(*dht.Server)) { + for _, ds := range cl.dhtServers { + f(ds) + } +} + +func (cl *Client) closeSockets() { + cl.eachListener(func(l socket) bool { + l.Close() + return true + }) + cl.conns = nil +} + // Stops the client. All connections to peers are closed and all activity will // come to a halt. func (cl *Client) Close() { cl.mu.Lock() defer cl.mu.Unlock() cl.closed.Set() - if cl.dHT != nil { - cl.dHT.Close() - } - if cl.utpSock != nil { - cl.utpSock.Close() - } - if cl.tcpListener != nil { - cl.tcpListener.Close() - } + cl.eachDhtServer(func(s *dht.Server) { s.Close() }) + cl.closeSockets() for _, t := range cl.torrents { t.close() } @@ -442,7 +373,7 @@ func (cl *Client) rejectAccepted(conn net.Conn) bool { return cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) } -func (cl *Client) acceptConnections(l net.Listener, utp bool) { +func (cl *Client) acceptConnections(l net.Listener) { cl.mu.Lock() defer cl.mu.Unlock() for { @@ -466,28 +397,23 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) { log.Fmsg("accepted connection from %s", conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger) go torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1) go torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1) - if utp { - go torrent.Add("accepted utp connections", 1) - } else { - go torrent.Add("accepted tcp connections", 1) - } + go torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1) if cl.rejectAccepted(conn) { go torrent.Add("rejected accepted connections", 1) conn.Close() } else { - go cl.incomingConnection(conn, utp) + go cl.incomingConnection(conn) } } } -func (cl *Client) incomingConnection(nc net.Conn, utp bool) { +func (cl *Client) incomingConnection(nc net.Conn) { defer nc.Close() if tc, ok := nc.(*net.TCPConn); ok { tc.SetLinger(0) } c := cl.newConnection(nc) c.Discovery = peerSourceIncoming - c.uTP = utp cl.runReceivedConn(c) } @@ -505,7 +431,6 @@ func (cl *Client) torrent(ih metainfo.Hash) *Torrent { type dialResult struct { Conn net.Conn - UTP bool } func countDialResult(err error) { @@ -546,22 +471,6 @@ func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err err return } -func (cl *Client) utpDialNetwork() string { - // We want to restrict the addr resolve inside the utp library to the - // correct network, since the utp Socket may be listening to a broader - // network for DHT purposes or otherwise. - if !cl.config.DisableIPv4Peers { - return "" - } - n := cl.utpSock.Addr().Network() - switch n { - case "udp", "udp4", "udp6": - return "udp6" - default: - panic(n) - } -} - func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string { switch { case allowIpv4 && allowIpv6: @@ -575,64 +484,70 @@ func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string { } } -func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) { - c, err = cl.utpSock.DialContext(ctx, cl.utpDialNetwork(), addr) - countDialResult(err) - return +func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) { + return sock.DialContext(ctx, "", addr) } -var ( - dialledFirstUtp = expvar.NewInt("dialledFirstUtp") - dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp") -) +var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"} + +func peerNetworkEnabled(network string, cfg Config) bool { + c := func(s string) bool { + return strings.Contains(network, s) + } + if cfg.DisableUTP { + if c("udp") || c("utp") { + return false + } + } + if cfg.DisableTCP && c("tcp") { + return false + } + return true +} // Returns a connection over UTP or TCP, whichever is first to connect. -func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) { +func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn { ctx, cancel := context.WithCancel(ctx) // As soon as we return one connection, cancel the others. defer cancel() left := 0 resCh := make(chan dialResult, left) - if !cl.config.DisableUTP { - left++ - go func() { - c, _ := cl.dialUTP(ctx, addr) - resCh <- dialResult{c, true} - }() - } - if !cl.config.DisableTCP { + dial := func(f func(_ context.Context, addr string) (net.Conn, error)) { left++ go func() { - c, _ := cl.dialTCP(ctx, addr) - resCh <- dialResult{c, false} + c, err := f(ctx, addr) + countDialResult(err) + resCh <- dialResult{c} }() } + func() { + cl.mu.Lock() + defer cl.mu.Unlock() + cl.eachListener(func(s socket) bool { + if peerNetworkEnabled(s.Addr().Network(), cl.config) { + dial(s.dial) + } + return true + }) + }() var res dialResult // Wait for a successful connection. for ; left > 0 && res.Conn == nil; left-- { res = <-resCh } - if left > 0 { - // There are still incompleted dials. - go func() { - for ; left > 0; left-- { - conn := (<-resCh).Conn - if conn != nil { - conn.Close() - } + // There are still incompleted dials. + go func() { + for ; left > 0; left-- { + conn := (<-resCh).Conn + if conn != nil { + conn.Close() } - }() - } - conn = res.Conn - utp = res.UTP - if conn != nil { - if utp { - dialledFirstUtp.Add(1) - } else { - dialledFirstNotUtp.Add(1) } + }() + if res.Conn != nil { + go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1) } - return + return res.Conn } func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { @@ -645,10 +560,9 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { // Performs initiator handshakes and returns a connection. Returns nil // *connection if no connection for valid reasons. -func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) { +func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) { c = cl.newConnection(nc) c.headerEncrypted = encryptHeader - c.uTP = utp ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) defer cancel() dl, ok := ctx.Deadline() @@ -666,31 +580,35 @@ func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torr return } -var ( - initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption") - initiatedConnWithFallbackHeaderEncryption = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption") -) +// Returns nil connection and nil error if no connection could be established +// for valid reasons. +func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) { + nc := cl.dialFirst(ctx, addr) + if nc == nil { + return + } + defer func() { + if c == nil || err != nil { + nc.Close() + } + }() + return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader) +} // Returns nil connection and nil error if no connection could be established // for valid reasons. func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - nc, utp := cl.dialFirst(ctx, addr) - if nc == nil { - return - } obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption - c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp) + c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst) if err != nil { - // log.Printf("error initiating connection handshakes: %s", err) - nc.Close() return - } else if c != nil { - initiatedConnWithPreferredHeaderEncryption.Add(1) + } + if c != nil { + go torrent.Add("initiated conn with preferred header obfuscation", 1) return } - nc.Close() if cl.config.ForceEncryption { // We should have just tried with an obfuscated header. A plaintext // header can't result in an encrypted connection, so we're done. @@ -699,23 +617,10 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, } return } - // Try again with encryption if we didn't earlier, or without if we did, - // using whichever protocol type worked last time. - if utp { - nc, err = cl.dialUTP(ctx, addr) - } else { - nc, err = cl.dialTCP(ctx, addr) - } - if err != nil { - err = fmt.Errorf("error dialing for header encryption fallback: %s", err) - return - } - c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp) - if err != nil || c == nil { - nc.Close() - } - if err == nil && c != nil { - initiatedConnWithFallbackHeaderEncryption.Add(1) + // Try again with encryption if we didn't earlier, or without if we did. + c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst) + if c != nil { + go torrent.Add("initiated conn with fallback header obfuscation", 1) } return } @@ -746,14 +651,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) { // The port number for incoming peer connections. 0 if the client isn't // listening. func (cl *Client) incomingPeerPort() int { - if cl.listenAddr == "" { - return 0 - } - _, port, err := missinggo.ParseHostPort(cl.listenAddr) - if err != nil { - panic(err) - } - return port + return cl.LocalPort() } func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) { @@ -952,14 +850,28 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) { }(), }) } - if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil { + if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() { conn.Post(pp.Message{ Type: pp.Port, - Port: uint16(missinggo.AddrPort(cl.dHT.Addr())), + Port: cl.dhtPort(), }) } } +func (cl *Client) dhtPort() (ret uint16) { + cl.eachDhtServer(func(s *dht.Server) { + ret = uint16(missinggo.AddrPort(s.Addr())) + }) + return +} + +func (cl *Client) haveDhtServer() (ret bool) { + cl.eachDhtServer(func(_ *dht.Server) { + ret = true + }) + return +} + // Process incoming ut_metadata message. func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error { var d map[string]int @@ -1079,9 +991,9 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor } new = true t = cl.newTorrent(infoHash, specStorage) - if cl.dHT != nil { - go t.dhtAnnouncer() - } + cl.eachDhtServer(func(s *dht.Server) { + go t.dhtAnnouncer(s) + }) cl.torrents[infoHash] = t t.updateWantPeersEvent() // Tickle Client.waitAccept, new torrent may want conns. @@ -1195,14 +1107,11 @@ func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) { return cl.AddTorrent(mi) } -func (cl *Client) DHT() *dht.Server { - return cl.dHT +func (cl *Client) DhtServers() []*dht.Server { + return cl.dhtServers } func (cl *Client) AddDHTNodes(nodes []string) { - if cl.DHT() == nil { - return - } for _, n := range nodes { hmp := missinggo.SplitHostMaybePort(n) ip := net.ParseIP(hmp.Host) @@ -1216,7 +1125,9 @@ func (cl *Client) AddDHTNodes(nodes []string) { Port: hmp.Port, }, } - cl.DHT().AddNode(ni) + cl.eachDhtServer(func(s *dht.Server) { + s.AddNode(ni) + }) } } @@ -1267,16 +1178,52 @@ func firstNotNil(ips ...net.IP) net.IP { return nil } +func (cl *Client) eachListener(f func(socket) bool) { + for _, s := range cl.conns { + if !f(s) { + break + } + } +} + +func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) { + cl.eachListener(func(l socket) bool { + ret = l + return !f(l) + }) + return +} + func (cl *Client) publicIp(peer net.IP) net.IP { // TODO: Use BEP 10 to determine how peers are seeing us. if peer.To4() != nil { - return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4()) + return firstNotNil( + cl.config.PublicIp4, + cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }), + ) } else { - return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16()) + return firstNotNil( + cl.config.PublicIp6, + cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }), + ) } } +func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP { + return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool { + return f(missinggo.AddrIP(l.Addr())) + }).Addr()) +} + // Our IP as a peer should see it. func (cl *Client) publicAddr(peer net.IP) ipPort { return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())} } + +func (cl *Client) ListenAddrs() (ret []net.Addr) { + cl.eachListener(func(l socket) bool { + ret = append(ret, l.Addr()) + return true + }) + return +} diff --git a/client_test.go b/client_test.go index c19129f9..abeb953a 100644 --- a/client_test.go +++ b/client_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/anacrolix/dht" _ "github.com/anacrolix/envpprof" "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/filecache" @@ -214,16 +215,6 @@ func TestUTPRawConn(t *testing.T) { } } -func TestTwoClientsArbitraryPorts(t *testing.T) { - for i := 0; i < 2; i++ { - cl, err := NewClient(TestingConfig()) - if err != nil { - t.Fatal(err) - } - defer cl.Close() - } -} - func TestAddDropManyTorrents(t *testing.T) { cl, err := NewClient(TestingConfig()) require.NoError(t, err) @@ -403,7 +394,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { require.NoError(t, err) assert.True(t, new) // Now do some things with leecher and seeder. - addClientPeer(leecherTorrent, seeder) + leecherTorrent.AddClientPeer(seeder) // The Torrent should not be interested in obtaining peers, so the one we // just added should be the only one. assert.False(t, leecherTorrent.Seeding()) @@ -461,6 +452,7 @@ func TestSeedAfterDownloading(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(cfg.DataDir) leecherLeecher, _ := NewClient(cfg) + require.NoError(t, err) defer leecherLeecher.Close() testutil.ExportStatusWriter(leecherLeecher, "ll") leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { @@ -486,8 +478,8 @@ func TestSeedAfterDownloading(t *testing.T) { require.NoError(t, err) assert.EqualValues(t, testutil.GreetingFileContents, b) }() - addClientPeer(leecherGreeting, seeder) - addClientPeer(leecherGreeting, leecherLeecher) + leecherGreeting.AddClientPeer(seeder) + leecherGreeting.AddClientPeer(leecherLeecher) wg.Add(1) go func() { defer wg.Done() @@ -585,7 +577,7 @@ func TestResponsive(t *testing.T) { ret.ChunkSize = 2 return }()) - addClientPeer(leecherTorrent, seeder) + leecherTorrent.AddClientPeer(seeder) reader := leecherTorrent.NewReader() defer reader.Close() reader.SetReadahead(0) @@ -628,7 +620,7 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { ret.ChunkSize = 2 return }()) - addClientPeer(leecherTorrent, seeder) + leecherTorrent.AddClientPeer(seeder) reader := leecherTorrent.NewReader() defer reader.Close() reader.SetReadahead(0) @@ -656,7 +648,12 @@ func TestDHTInheritBlocklist(t *testing.T) { cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - require.Equal(t, ipl, cl.DHT().IPBlocklist()) + numServers := 0 + cl.eachDhtServer(func(s *dht.Server) { + assert.Equal(t, ipl, s.IPBlocklist()) + numServers++ + }) + assert.EqualValues(t, 2, numServers) } // Check that stuff is merged in subsequent AddTorrentSpec for the same @@ -761,21 +758,29 @@ func TestAddMetainfoWithNodes(t *testing.T) { cfg := TestingConfig() cfg.ListenAddr = ":0" cfg.NoDHT = false + cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil } // For now, we want to just jam the nodes into the table, without // verifying them first. Also the DHT code doesn't support mixing secure // and insecure nodes if security is enabled (yet). - cfg.DHTConfig.NoSecurity = true + // cfg.DHTConfig.NoSecurity = true cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions) + sum := func() (ret int) { + cl.eachDhtServer(func(s *dht.Server) { + ret += s.NumNodes() + ret += s.Stats().OutstandingTransactions + }) + return + } + assert.EqualValues(t, 0, sum()) tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent") require.NoError(t, err) // Nodes are not added or exposed in Torrent's metainfo. We just randomly // check if the announce-list is here instead. TODO: Add nodes. assert.Len(t, tt.metainfo.AnnounceList, 5) // There are 6 nodes in the torrent file. - assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions) + assert.EqualValues(t, 6*len(cl.dhtServers), sum()) } type testDownloadCancelParams struct { @@ -831,7 +836,7 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) { } leecherGreeting.cl.mu.Unlock() - addClientPeer(leecherGreeting, seeder) + leecherGreeting.AddClientPeer(seeder) completes := make(map[int]bool, 3) values: for { @@ -908,8 +913,12 @@ func TestClientDynamicListenPortAllProtocols(t *testing.T) { cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() - assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) - assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr())) + port := cl.LocalPort() + assert.NotEqual(t, 0, port) + cl.eachListener(func(s socket) bool { + assert.Equal(t, port, missinggo.AddrPort(s.Addr())) + return true + }) } func TestClientDynamicListenTCPOnly(t *testing.T) { @@ -918,8 +927,11 @@ func TestClientDynamicListenTCPOnly(t *testing.T) { cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) - assert.Nil(t, cl.utpSock) + assert.NotEqual(t, 0, cl.LocalPort()) + cl.eachListener(func(s socket) bool { + assert.True(t, isTcpNetwork(s.Addr().Network())) + return true + }) } func TestClientDynamicListenUTPOnly(t *testing.T) { @@ -928,8 +940,11 @@ func TestClientDynamicListenUTPOnly(t *testing.T) { cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) - assert.Nil(t, cl.tcpListener) + assert.NotEqual(t, 0, cl.LocalPort()) + cl.eachListener(func(s socket) bool { + assert.True(t, isUtpNetwork(s.Addr().Network())) + return true + }) } func TestClientDynamicListenPortNoProtocols(t *testing.T) { @@ -939,16 +954,7 @@ func TestClientDynamicListenPortNoProtocols(t *testing.T) { cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() - assert.Nil(t, cl.ListenAddr()) -} - -func addClientPeer(t *Torrent, cl *Client) { - t.AddPeers([]Peer{ - { - IP: missinggo.AddrIP(cl.ListenAddr()), - Port: missinggo.AddrPort(cl.ListenAddr()), - }, - }) + assert.Equal(t, 0, cl.LocalPort()) } func totalConns(tts []*Torrent) (ret int) { @@ -978,7 +984,7 @@ func TestSetMaxEstablishedConn(t *testing.T) { for _, tt := range tts { for _, _tt := range tts { // if tt != _tt { - addClientPeer(tt, _tt.cl) + tt.AddClientPeer(_tt.cl) // } } } @@ -1048,10 +1054,7 @@ func TestMultipleTorrentsWithEncryption(t *testing.T) { testutil.ExportStatusWriter(client, "c") tr, err := client.AddMagnet(magnet1) require.NoError(t, err) - tr.AddPeers([]Peer{{ - IP: missinggo.AddrIP(server.ListenAddr()), - Port: missinggo.AddrPort(server.ListenAddr()), - }}) + tr.AddClientPeer(server) <-tr.GotInfo() tr.DownloadAll() client.WaitAll() diff --git a/cmd/magnet-metainfo/main.go b/cmd/magnet-metainfo/main.go index c943bea8..536f7abf 100644 --- a/cmd/magnet-metainfo/main.go +++ b/cmd/magnet-metainfo/main.go @@ -28,7 +28,9 @@ func main() { cl.WriteStatus(w) }) http.HandleFunc("/dht", func(w http.ResponseWriter, r *http.Request) { - cl.DHT().WriteStatus(w) + for _, ds := range cl.DhtServers() { + ds.WriteStatus(w) + } }) wg := sync.WaitGroup{} for _, arg := range args.Magnet { diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index d4dba223..52547590 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -13,7 +13,6 @@ import ( "github.com/anacrolix/torrent/iplist" - "github.com/anacrolix/dht" "github.com/anacrolix/envpprof" "github.com/anacrolix/tagflag" "github.com/dustin/go-humanize" @@ -148,9 +147,6 @@ func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) tagflag.Parse(&flags) clientConfig := torrent.Config{ - DHTConfig: dht.ServerConfig{ - StartingNodes: dht.GlobalBootstrapAddrs, - }, Debug: flags.Debug, Seed: flags.Seed, } diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index ed3d94bf..4d9119da 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -15,7 +15,6 @@ import ( "bazil.org/fuse" fusefs "bazil.org/fuse/fs" - "github.com/anacrolix/dht" _ "github.com/anacrolix/envpprof" "github.com/anacrolix/tagflag" @@ -92,9 +91,6 @@ func mainExitCode() int { DisableTrackers: args.DisableTrackers, ListenAddr: args.ListenAddr.String(), NoUpload: true, // Ensure that downloads are responsive. - DHTConfig: dht.ServerConfig{ - StartingNodes: dht.GlobalBootstrapAddrs, - }, }) if err != nil { log.Print(err) diff --git a/config.go b/config.go index d0f09669..c2aec9e3 100644 --- a/config.go +++ b/config.go @@ -6,9 +6,9 @@ import ( "net/http" "time" - "github.com/anacrolix/dht" "golang.org/x/time/rate" + "github.com/anacrolix/dht" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/storage" ) @@ -38,11 +38,10 @@ type Config struct { // Don't announce to trackers. This only leaves DHT to discover peers. DisableTrackers bool `long:"disable-trackers"` DisablePEX bool `long:"disable-pex"` - // Don't create a DHT. - NoDHT bool `long:"disable-dht"` - // Overrides the default DHT configuration. - DHTConfig dht.ServerConfig + // Don't create a DHT. + NoDHT bool `long:"disable-dht"` + DhtStartingNodes dht.StartingNodesGetter // Never send chunks to peers. NoUpload bool `long:"no-upload"` // Disable uploading even when it isn't fair. @@ -148,6 +147,9 @@ func (cfg *Config) setDefaults() { if cfg.HandshakesTimeout == 0 { cfg.HandshakesTimeout = 20 * time.Second } + if cfg.DhtStartingNodes == nil { + cfg.DhtStartingNodes = dht.GlobalBootstrapAddrs + } } type EncryptionPolicy struct { diff --git a/connection.go b/connection.go index 6f63945d..73c4710f 100644 --- a/connection.go +++ b/connection.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/anacrolix/dht" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" @@ -48,7 +49,6 @@ type connection struct { headerEncrypted bool cryptoMethod mse.CryptoMethod Discovery peerSource - uTP bool closed missinggo.Event stats ConnStats @@ -179,12 +179,16 @@ func (cn *connection) connectionFlags() (ret string) { c('e') } ret += string(cn.Discovery) - if cn.uTP { + if cn.utp() { c('U') } return } +func (cn *connection) utp() bool { + return strings.Contains(cn.remoteAddr().Network(), "utp") +} + // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText func (cn *connection) statusFlags() (ret string) { c := func(b byte) { @@ -1019,9 +1023,6 @@ func (c *connection) mainReadLoop() (err error) { case pp.Extended: err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) case pp.Port: - if cl.dHT == nil { - break - } pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String()) if err != nil { panic(err) @@ -1029,7 +1030,9 @@ func (c *connection) mainReadLoop() (err error) { if msg.Port != 0 { pingAddr.Port = int(msg.Port) } - go cl.dHT.Ping(pingAddr, nil) + cl.eachDhtServer(func(s *dht.Server) { + go s.Ping(pingAddr, nil) + }) case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger) diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 06f4c888..164a8fec 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -197,13 +197,9 @@ func TestDownloadOnDemand(t *testing.T) { require.NoError(t, err) testutil.ExportStatusWriter(leecher, "l") defer leecher.Close() - leecherTorrent, _ := leecher.AddTorrent(layout.Metainfo) - leecherTorrent.AddPeers([]torrent.Peer{ - { - IP: missinggo.AddrIP(seeder.ListenAddr()), - Port: missinggo.AddrPort(seeder.ListenAddr()), - }, - }) + leecherTorrent, err := leecher.AddTorrent(layout.Metainfo) + require.NoError(t, err) + leecherTorrent.AddClientPeer(seeder) fs := New(leecher) defer fs.Destroy() root, _ := fs.Root() diff --git a/listen.go b/listen.go new file mode 100644 index 00000000..68c4b768 --- /dev/null +++ b/listen.go @@ -0,0 +1,17 @@ +package torrent + +type peerNetworks struct { + tcp4, tcp6 bool + utp4, utp6 bool +} + +func handleErr(h func(), fs ...func() error) error { + for _, f := range fs { + err := f() + if err != nil { + h() + return err + } + } + return nil +} diff --git a/misc.go b/misc.go index c23910ea..6fdb77c0 100644 --- a/misc.go +++ b/misc.go @@ -125,8 +125,10 @@ func addrCompactIP(addr net.Addr) (string, error) { return string(ip.To16()), nil } -func connIsIpv6(nc net.Conn) bool { - ra := nc.RemoteAddr() +func connIsIpv6(nc interface { + LocalAddr() net.Addr +}) bool { + ra := nc.LocalAddr() rip := missinggo.AddrIP(ra) return rip.To4() == nil && rip.To16() != nil } diff --git a/socket.go b/socket.go new file mode 100644 index 00000000..7c14e3fd --- /dev/null +++ b/socket.go @@ -0,0 +1,123 @@ +package torrent + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/anacrolix/missinggo" +) + +type dialer interface { + dial(_ context.Context, addr string) (net.Conn, error) +} + +type socket interface { + net.Listener + dialer +} + +func listen(network, addr string) (socket, error) { + if isTcpNetwork(network) { + return listenTcp(network, addr) + } else if isUtpNetwork(network) { + return listenUtp(network, addr) + } else { + panic(fmt.Sprintf("unknown network %q", network)) + } +} + +func isTcpNetwork(s string) bool { + return strings.Contains(s, "tcp") +} + +func isUtpNetwork(s string) bool { + return strings.Contains(s, "utp") || strings.Contains(s, "udp") +} + +func listenTcp(network, address string) (s socket, err error) { + l, err := net.Listen(network, address) + if err != nil { + return + } + return tcpSocket{l}, nil +} + +type tcpSocket struct { + net.Listener +} + +func (me tcpSocket) dial(ctx context.Context, addr string) (net.Conn, error) { + return net.Dial(me.Addr().Network(), addr) +} + +func setPort(addr string, port int) string { + host, _, err := net.SplitHostPort(addr) + if err != nil { + panic(err) + } + return net.JoinHostPort(host, strconv.FormatInt(int64(port), 10)) +} + +func listenAll(networks []string, addr string) ([]socket, error) { + if len(networks) == 0 { + return nil, nil + } + for { + ss, retry, err := listenAllRetry(networks, addr) + if !retry { + return ss, err + } + } +} + +func listenAllRetry(networks []string, addr string) (ss []socket, retry bool, err error) { + _, port, err := missinggo.ParseHostPort(addr) + if err != nil { + err = fmt.Errorf("error parsing addr: %s", err) + return + } + ss = make([]socket, 1, len(networks)) + ss[0], err = listen(networks[0], addr) + if err != nil { + return nil, false, fmt.Errorf("first listen: %s", err) + } + defer func() { + if err != nil || retry { + for _, s := range ss { + s.Close() + } + ss = nil + } + }() + restAddr := setPort(addr, missinggo.AddrPort(ss[0].Addr())) + for _, n := range networks[1:] { + s, err := listen(n, restAddr) + if err != nil { + return ss, + missinggo.IsAddrInUse(err) && port == 0, + fmt.Errorf("subsequent listen: %s", err) + } + ss = append(ss, s) + } + return +} + +func listenUtp(network, addr string) (s socket, err error) { + us, err := NewUtpSocket(network, addr) + if err != nil { + return + } + return utpSocketSocket{us, network}, nil +} + +type utpSocketSocket struct { + utpSocket + network string +} + +func (me utpSocketSocket) dial(ctx context.Context, addr string) (net.Conn, error) { + return me.utpSocket.DialContext(ctx, me.network, addr) +} diff --git a/torrent.go b/torrent.go index 2387efb5..83b6546b 100644 --- a/torrent.go +++ b/torrent.go @@ -1378,9 +1378,9 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) { } } -func (t *Torrent) announceDHT(impliedPort bool) (err error) { +func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) { cl := t.cl - ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort) + ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort) if err != nil { return } @@ -1389,7 +1389,7 @@ func (t *Torrent) announceDHT(impliedPort bool) (err error) { return } -func (t *Torrent) dhtAnnouncer() { +func (t *Torrent) dhtAnnouncer(s *dht.Server) { cl := t.cl for { select { @@ -1397,7 +1397,7 @@ func (t *Torrent) dhtAnnouncer() { case <-t.closed.LockedChan(&cl.mu): return } - err := t.announceDHT(true) + err := t.announceDHT(true, s) func() { cl.mu.Lock() defer cl.mu.Unlock() @@ -1724,3 +1724,15 @@ func (t *Torrent) initiateConn(peer Peer) { t.halfOpen[addr] = peer go t.cl.outgoingConnection(t, addr, peer.Source) } + +func (t *Torrent) AddClientPeer(cl *Client) { + t.AddPeers(func() (ps []Peer) { + for _, la := range cl.ListenAddrs() { + ps = append(ps, Peer{ + IP: missinggo.AddrIP(la), + Port: missinggo.AddrPort(la), + }) + } + return + }()) +} diff --git a/torrent_test.go b/torrent_test.go index 43887e16..c9ea5e42 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -1,6 +1,7 @@ package torrent import ( + "fmt" "net" "os" "path/filepath" @@ -176,7 +177,7 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) { assert.Nil(t, tt.Metainfo().InfoBytes) assert.False(t, tt.haveAllMetadataPieces()) - nc, err := net.Dial("tcp", cl.ListenAddr().String()) + nc, err := net.Dial("tcp", fmt.Sprintf(":%d", cl.LocalPort())) require.NoError(t, err) defer nc.Close()