]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework conns to/and allow multiple DHT servers
authorMatt Joiner <anacrolix@gmail.com>
Thu, 12 Apr 2018 01:41:07 +0000 (11:41 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 12 Apr 2018 01:41:07 +0000 (11:41 +1000)
This will help with #229, and IPv6 support.

14 files changed:
bencode/decode.go
client.go
client_test.go
cmd/magnet-metainfo/main.go
cmd/torrent/main.go
cmd/torrentfs/main.go
config.go
connection.go
fs/torrentfs_test.go
listen.go [new file with mode: 0644]
misc.go
socket.go [new file with mode: 0644]
torrent.go
torrent_test.go

index 51a7a2d52ea941cb652f53f296bc43748d3decd3..dc985aaf7dd94609027a516b8dd2be53cb87112e 100644 (file)
@@ -226,9 +226,9 @@ func getDictField(dict reflect.Value, key string) dictField {
                        })
                }
                return dictField{
-                       Value: dict.FieldByIndex(sf.Index),
-                       Ok:    true,
-                       Set:   func() {},
+                       Value:                    dict.FieldByIndex(sf.Index),
+                       Ok:                       true,
+                       Set:                      func() {},
                        IgnoreUnmarshalTypeError: getTag(sf.Tag).IgnoreUnmarshalTypeError(),
                }
        default:
index d932cd7f23918a77316997e87aa4ac31a3ae6aae..2b984b57627a73b5aa926de524b3be2446673111 100644 (file)
--- a/client.go
+++ b/client.go
@@ -7,7 +7,6 @@ import (
        "crypto/rand"
        "encoding/binary"
        "errors"
-       "expvar"
        "fmt"
        "io"
        "net"
@@ -49,16 +48,13 @@ type Client struct {
        peerID         PeerID
        defaultStorage *storage.Client
        onClose        []func()
-       tcpListener    net.Listener
-       utpSock        utpSocket
-       dHT            *dht.Server
+       conns          []socket
+       dhtServers     []*dht.Server
        ipBlockList    iplist.Ranger
        // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
        extensionBytes peerExtensionBytes
-       // The net.Addr.String part that should be common to all active listeners.
-       listenAddr    string
-       uploadLimit   *rate.Limiter
-       downloadLimit *rate.Limiter
+       uploadLimit    *rate.Limiter
+       downloadLimit  *rate.Limiter
 
        // Set of addresses that have our client ID. This intentionally will
        // include ourselves if we end up trying to connect to our own address
@@ -78,21 +74,6 @@ func (cl *Client) badPeerIPsLocked() []string {
        return slices.FromMapKeys(cl.badPeerIPs).([]string)
 }
 
-func (cl *Client) IPBlockList() iplist.Ranger {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
-       return cl.ipBlockList
-}
-
-func (cl *Client) SetIPBlockList(list iplist.Ranger) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
-       cl.ipBlockList = list
-       if cl.dHT != nil {
-               cl.dHT.SetIPBlockList(list)
-       }
-}
-
 func (cl *Client) PeerID() PeerID {
        return cl.peerID
 }
@@ -103,11 +84,29 @@ func (torrentAddr) Network() string { return "" }
 
 func (me torrentAddr) String() string { return string(me) }
 
-func (cl *Client) ListenAddr() net.Addr {
-       if cl.listenAddr == "" {
-               return nil
-       }
-       return torrentAddr(cl.listenAddr)
+func (cl *Client) LocalPort() (port int) {
+       cl.eachListener(func(l socket) bool {
+               _port := missinggo.AddrPort(l.Addr())
+               if _port == 0 {
+                       panic(l)
+               }
+               if port == 0 {
+                       port = _port
+               } else if port != _port {
+                       panic("mismatched ports")
+               }
+               return true
+       })
+       return
+}
+
+func writeDhtServerStatus(w io.Writer, s *dht.Server) {
+       dhtStats := s.Stats()
+       fmt.Fprintf(w, "\tDHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
+       fmt.Fprintf(w, "\tDHT Server ID: %x\n", s.ID())
+       fmt.Fprintf(w, "\tDHT port: %d\n", missinggo.AddrPort(s.Addr()))
+       fmt.Fprintf(w, "\tDHT announces: %d\n", dhtStats.ConfirmedAnnounces)
+       fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
 }
 
 // Writes out a human readable status of the client, such as for writing to a
@@ -117,22 +116,14 @@ func (cl *Client) WriteStatus(_w io.Writer) {
        defer cl.mu.Unlock()
        w := bufio.NewWriter(_w)
        defer w.Flush()
-       if addr := cl.ListenAddr(); addr != nil {
-               fmt.Fprintf(w, "Listening on %s\n", addr)
-       } else {
-               fmt.Fprintln(w, "Not listening!")
-       }
+       fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
        fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
        fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
        fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
-       if dht := cl.DHT(); dht != nil {
-               dhtStats := dht.Stats()
-               fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
-               fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
-               fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
-               fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
-               fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
-       }
+       cl.eachDhtServer(func(s *dht.Server) {
+               fmt.Fprintf(w, "%s DHT server:\n", s.Addr().Network())
+               writeDhtServerStatus(w, s)
+       })
        fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
        fmt.Fprintln(w)
        for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
@@ -155,76 +146,6 @@ func (cl *Client) WriteStatus(_w io.Writer) {
        }
 }
 
-func listenUTP(networkSuffix, addr string) (utpSocket, error) {
-       return NewUtpSocket("udp"+networkSuffix, addr)
-}
-
-func listenTCP(networkSuffix, addr string) (net.Listener, error) {
-       return net.Listen("tcp"+networkSuffix, addr)
-}
-
-func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
-       for {
-               tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
-               if err != nil {
-                       return
-               }
-               listenedAddr = tcpL.Addr().String()
-               utpSock, err = listenUTP(networkSuffix, listenedAddr)
-               if err == nil {
-                       return
-               }
-               tcpL.Close()
-               if !strings.Contains(err.Error(), "address already in use") {
-                       return
-               }
-       }
-}
-
-// Listen to enabled protocols, ensuring ports match.
-func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
-       if addr == "" {
-               addr = ":50007"
-       }
-       if tcp && utp {
-               var host string
-               var port int
-               host, port, err = missinggo.ParseHostPort(addr)
-               if err != nil {
-                       return
-               }
-               if port == 0 {
-                       // If both protocols are active, they need to have the same port.
-                       return listenBothSameDynamicPort(networkSuffix, host)
-               }
-       }
-       defer func() {
-               if err != nil {
-                       listenedAddr = ""
-               }
-       }()
-       if tcp {
-               tcpL, err = listenTCP(networkSuffix, addr)
-               if err != nil {
-                       return
-               }
-               defer func() {
-                       if err != nil {
-                               tcpL.Close()
-                       }
-               }()
-               listenedAddr = tcpL.Addr().String()
-       }
-       if utp {
-               utpSock, err = listenUTP(networkSuffix, addr)
-               if err != nil {
-                       return
-               }
-               listenedAddr = utpSock.Addr().String()
-       }
-       return
-}
-
 const debugLogValue = "debug"
 
 func (cl *Client) debugLogFilter(m *log.Msg) bool {
@@ -243,15 +164,7 @@ func (cl *Client) announceKey() int32 {
        return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
 }
 
-// Creates a new client.
 func NewClient(cfg *Config) (cl *Client, err error) {
-       if cfg == nil {
-               cfg = &Config{
-                       DHTConfig: dht.ServerConfig{
-                               StartingNodes: dht.GlobalBootstrapAddrs,
-                       },
-               }
-       }
        if cfg == nil {
                cfg = &Config{}
        }
@@ -312,53 +225,64 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                }
        }
 
-       cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
-               !cl.config.DisableTCP,
-               !cl.config.DisableUTP,
-               // We'll listen to IPv4 for TCP even if IPv4 peer connections are
-               // disabled because we want to ensure peers don't connect to some
-               // other process on that port.
-               ipNetworkSuffix(!cl.config.DisableIPv4, !cl.config.DisableIPv6),
-               cl.config.ListenAddr)
+       cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenAddr)
        if err != nil {
                return
        }
-       go cl.forwardPort()
-       if cl.tcpListener != nil {
-               go cl.acceptConnections(cl.tcpListener, false)
-       }
-       if cl.utpSock != nil {
-               go cl.acceptConnections(cl.utpSock, true)
+       cl.LocalPort()
+
+       for _, s := range cl.conns {
+               if peerNetworkEnabled(s.Addr().Network(), cl.config) {
+                       go cl.acceptConnections(s)
+               }
        }
+
+       go cl.forwardPort()
        if !cfg.NoDHT {
-               dhtCfg := cfg.DHTConfig
-               if dhtCfg.IPBlocklist == nil {
-                       dhtCfg.IPBlocklist = cl.ipBlockList
-               }
-               if dhtCfg.Conn == nil {
-                       if cl.utpSock != nil {
-                               dhtCfg.Conn = cl.utpSock
-                       } else {
-                               dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
+               for _, s := range cl.conns {
+                       if pc, ok := s.(net.PacketConn); ok {
+                               ds, err := cl.newDhtServer(pc)
                                if err != nil {
-                                       return
+                                       panic(err)
                                }
+                               cl.dhtServers = append(cl.dhtServers, ds)
                        }
                }
-               if dhtCfg.OnAnnouncePeer == nil {
-                       dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
-               }
-               cl.dHT, err = dht.NewServer(&dhtCfg)
-               if err != nil {
-                       return
+       }
+
+       return
+}
+
+func (cl *Client) enabledPeerNetworks() (ns []string) {
+       for _, n := range allPeerNetworks {
+               if peerNetworkEnabled(n, cl.config) {
+                       ns = append(ns, n)
                }
+       }
+       return
+}
+
+func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
+       cfg := dht.ServerConfig{
+               IPBlocklist:    cl.ipBlockList,
+               Conn:           conn,
+               OnAnnouncePeer: cl.onDHTAnnouncePeer,
+               PublicIP: func() net.IP {
+                       if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
+                               return cl.config.PublicIp6
+                       }
+                       return cl.config.PublicIp4
+               }(),
+               StartingNodes: cl.config.DhtStartingNodes,
+       }
+       s, err = dht.NewServer(&cfg)
+       if err == nil {
                go func() {
-                       if _, err := cl.dHT.Bootstrap(); err != nil {
+                       if _, err := s.Bootstrap(); err != nil {
                                log.Printf("error bootstrapping dht: %s", err)
                        }
                }()
        }
-
        return
 }
 
@@ -377,21 +301,28 @@ func (cl *Client) Closed() <-chan struct{} {
        return cl.closed.C()
 }
 
+func (cl *Client) eachDhtServer(f func(*dht.Server)) {
+       for _, ds := range cl.dhtServers {
+               f(ds)
+       }
+}
+
+func (cl *Client) closeSockets() {
+       cl.eachListener(func(l socket) bool {
+               l.Close()
+               return true
+       })
+       cl.conns = nil
+}
+
 // Stops the client. All connections to peers are closed and all activity will
 // come to a halt.
 func (cl *Client) Close() {
        cl.mu.Lock()
        defer cl.mu.Unlock()
        cl.closed.Set()
-       if cl.dHT != nil {
-               cl.dHT.Close()
-       }
-       if cl.utpSock != nil {
-               cl.utpSock.Close()
-       }
-       if cl.tcpListener != nil {
-               cl.tcpListener.Close()
-       }
+       cl.eachDhtServer(func(s *dht.Server) { s.Close() })
+       cl.closeSockets()
        for _, t := range cl.torrents {
                t.close()
        }
@@ -442,7 +373,7 @@ func (cl *Client) rejectAccepted(conn net.Conn) bool {
        return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
 }
 
-func (cl *Client) acceptConnections(l net.Listener, utp bool) {
+func (cl *Client) acceptConnections(l net.Listener) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
        for {
@@ -466,28 +397,23 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) {
                log.Fmsg("accepted connection from %s", conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
                go torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
                go torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
-               if utp {
-                       go torrent.Add("accepted utp connections", 1)
-               } else {
-                       go torrent.Add("accepted tcp connections", 1)
-               }
+               go torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
                if cl.rejectAccepted(conn) {
                        go torrent.Add("rejected accepted connections", 1)
                        conn.Close()
                } else {
-                       go cl.incomingConnection(conn, utp)
+                       go cl.incomingConnection(conn)
                }
        }
 }
 
-func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
+func (cl *Client) incomingConnection(nc net.Conn) {
        defer nc.Close()
        if tc, ok := nc.(*net.TCPConn); ok {
                tc.SetLinger(0)
        }
        c := cl.newConnection(nc)
        c.Discovery = peerSourceIncoming
-       c.uTP = utp
        cl.runReceivedConn(c)
 }
 
@@ -505,7 +431,6 @@ func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
 
 type dialResult struct {
        Conn net.Conn
-       UTP  bool
 }
 
 func countDialResult(err error) {
@@ -546,22 +471,6 @@ func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err err
        return
 }
 
-func (cl *Client) utpDialNetwork() string {
-       // We want to restrict the addr resolve inside the utp library to the
-       // correct network, since the utp Socket may be listening to a broader
-       // network for DHT purposes or otherwise.
-       if !cl.config.DisableIPv4Peers {
-               return ""
-       }
-       n := cl.utpSock.Addr().Network()
-       switch n {
-       case "udp", "udp4", "udp6":
-               return "udp6"
-       default:
-               panic(n)
-       }
-}
-
 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
        switch {
        case allowIpv4 && allowIpv6:
@@ -575,64 +484,70 @@ func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
        }
 }
 
-func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
-       c, err = cl.utpSock.DialContext(ctx, cl.utpDialNetwork(), addr)
-       countDialResult(err)
-       return
+func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
+       return sock.DialContext(ctx, "", addr)
 }
 
-var (
-       dialledFirstUtp    = expvar.NewInt("dialledFirstUtp")
-       dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
-)
+var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
+
+func peerNetworkEnabled(network string, cfg Config) bool {
+       c := func(s string) bool {
+               return strings.Contains(network, s)
+       }
+       if cfg.DisableUTP {
+               if c("udp") || c("utp") {
+                       return false
+               }
+       }
+       if cfg.DisableTCP && c("tcp") {
+               return false
+       }
+       return true
+}
 
 // Returns a connection over UTP or TCP, whichever is first to connect.
-func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
+func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
        ctx, cancel := context.WithCancel(ctx)
        // As soon as we return one connection, cancel the others.
        defer cancel()
        left := 0
        resCh := make(chan dialResult, left)
-       if !cl.config.DisableUTP {
-               left++
-               go func() {
-                       c, _ := cl.dialUTP(ctx, addr)
-                       resCh <- dialResult{c, true}
-               }()
-       }
-       if !cl.config.DisableTCP {
+       dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
                left++
                go func() {
-                       c, _ := cl.dialTCP(ctx, addr)
-                       resCh <- dialResult{c, false}
+                       c, err := f(ctx, addr)
+                       countDialResult(err)
+                       resCh <- dialResult{c}
                }()
        }
+       func() {
+               cl.mu.Lock()
+               defer cl.mu.Unlock()
+               cl.eachListener(func(s socket) bool {
+                       if peerNetworkEnabled(s.Addr().Network(), cl.config) {
+                               dial(s.dial)
+                       }
+                       return true
+               })
+       }()
        var res dialResult
        // Wait for a successful connection.
        for ; left > 0 && res.Conn == nil; left-- {
                res = <-resCh
        }
-       if left > 0 {
-               // There are still incompleted dials.
-               go func() {
-                       for ; left > 0; left-- {
-                               conn := (<-resCh).Conn
-                               if conn != nil {
-                                       conn.Close()
-                               }
+       // There are still incompleted dials.
+       go func() {
+               for ; left > 0; left-- {
+                       conn := (<-resCh).Conn
+                       if conn != nil {
+                               conn.Close()
                        }
-               }()
-       }
-       conn = res.Conn
-       utp = res.UTP
-       if conn != nil {
-               if utp {
-                       dialledFirstUtp.Add(1)
-               } else {
-                       dialledFirstNotUtp.Add(1)
                }
+       }()
+       if res.Conn != nil {
+               go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
        }
-       return
+       return res.Conn
 }
 
 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
@@ -645,10 +560,9 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
 
 // Performs initiator handshakes and returns a connection. Returns nil
 // *connection if no connection for valid reasons.
-func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
+func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
        c = cl.newConnection(nc)
        c.headerEncrypted = encryptHeader
-       c.uTP = utp
        ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
        defer cancel()
        dl, ok := ctx.Deadline()
@@ -666,31 +580,35 @@ func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torr
        return
 }
 
-var (
-       initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
-       initiatedConnWithFallbackHeaderEncryption  = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
-)
+// Returns nil connection and nil error if no connection could be established
+// for valid reasons.
+func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
+       nc := cl.dialFirst(ctx, addr)
+       if nc == nil {
+               return
+       }
+       defer func() {
+               if c == nil || err != nil {
+                       nc.Close()
+               }
+       }()
+       return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
+}
 
 // Returns nil connection and nil error if no connection could be established
 // for valid reasons.
 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
        ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
        defer cancel()
-       nc, utp := cl.dialFirst(ctx, addr)
-       if nc == nil {
-               return
-       }
        obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
-       c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
+       c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
        if err != nil {
-               // log.Printf("error initiating connection handshakes: %s", err)
-               nc.Close()
                return
-       } else if c != nil {
-               initiatedConnWithPreferredHeaderEncryption.Add(1)
+       }
+       if c != nil {
+               go torrent.Add("initiated conn with preferred header obfuscation", 1)
                return
        }
-       nc.Close()
        if cl.config.ForceEncryption {
                // We should have just tried with an obfuscated header. A plaintext
                // header can't result in an encrypted connection, so we're done.
@@ -699,23 +617,10 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
                }
                return
        }
-       // Try again with encryption if we didn't earlier, or without if we did,
-       // using whichever protocol type worked last time.
-       if utp {
-               nc, err = cl.dialUTP(ctx, addr)
-       } else {
-               nc, err = cl.dialTCP(ctx, addr)
-       }
-       if err != nil {
-               err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
-               return
-       }
-       c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
-       if err != nil || c == nil {
-               nc.Close()
-       }
-       if err == nil && c != nil {
-               initiatedConnWithFallbackHeaderEncryption.Add(1)
+       // Try again with encryption if we didn't earlier, or without if we did.
+       c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
+       if c != nil {
+               go torrent.Add("initiated conn with fallback header obfuscation", 1)
        }
        return
 }
@@ -746,14 +651,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
 // The port number for incoming peer connections. 0 if the client isn't
 // listening.
 func (cl *Client) incomingPeerPort() int {
-       if cl.listenAddr == "" {
-               return 0
-       }
-       _, port, err := missinggo.ParseHostPort(cl.listenAddr)
-       if err != nil {
-               panic(err)
-       }
-       return port
+       return cl.LocalPort()
 }
 
 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
@@ -952,14 +850,28 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
                        }(),
                })
        }
-       if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
+       if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
                conn.Post(pp.Message{
                        Type: pp.Port,
-                       Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
+                       Port: cl.dhtPort(),
                })
        }
 }
 
+func (cl *Client) dhtPort() (ret uint16) {
+       cl.eachDhtServer(func(s *dht.Server) {
+               ret = uint16(missinggo.AddrPort(s.Addr()))
+       })
+       return
+}
+
+func (cl *Client) haveDhtServer() (ret bool) {
+       cl.eachDhtServer(func(_ *dht.Server) {
+               ret = true
+       })
+       return
+}
+
 // Process incoming ut_metadata message.
 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
        var d map[string]int
@@ -1079,9 +991,9 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
        }
        new = true
        t = cl.newTorrent(infoHash, specStorage)
-       if cl.dHT != nil {
-               go t.dhtAnnouncer()
-       }
+       cl.eachDhtServer(func(s *dht.Server) {
+               go t.dhtAnnouncer(s)
+       })
        cl.torrents[infoHash] = t
        t.updateWantPeersEvent()
        // Tickle Client.waitAccept, new torrent may want conns.
@@ -1195,14 +1107,11 @@ func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
        return cl.AddTorrent(mi)
 }
 
-func (cl *Client) DHT() *dht.Server {
-       return cl.dHT
+func (cl *Client) DhtServers() []*dht.Server {
+       return cl.dhtServers
 }
 
 func (cl *Client) AddDHTNodes(nodes []string) {
-       if cl.DHT() == nil {
-               return
-       }
        for _, n := range nodes {
                hmp := missinggo.SplitHostMaybePort(n)
                ip := net.ParseIP(hmp.Host)
@@ -1216,7 +1125,9 @@ func (cl *Client) AddDHTNodes(nodes []string) {
                                Port: hmp.Port,
                        },
                }
-               cl.DHT().AddNode(ni)
+               cl.eachDhtServer(func(s *dht.Server) {
+                       s.AddNode(ni)
+               })
        }
 }
 
@@ -1267,16 +1178,52 @@ func firstNotNil(ips ...net.IP) net.IP {
        return nil
 }
 
+func (cl *Client) eachListener(f func(socket) bool) {
+       for _, s := range cl.conns {
+               if !f(s) {
+                       break
+               }
+       }
+}
+
+func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
+       cl.eachListener(func(l socket) bool {
+               ret = l
+               return !f(l)
+       })
+       return
+}
+
 func (cl *Client) publicIp(peer net.IP) net.IP {
        // TODO: Use BEP 10 to determine how peers are seeing us.
        if peer.To4() != nil {
-               return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4())
+               return firstNotNil(
+                       cl.config.PublicIp4,
+                       cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
+               )
        } else {
-               return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16())
+               return firstNotNil(
+                       cl.config.PublicIp6,
+                       cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
+               )
        }
 }
 
+func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
+       return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
+               return f(missinggo.AddrIP(l.Addr()))
+       }).Addr())
+}
+
 // Our IP as a peer should see it.
 func (cl *Client) publicAddr(peer net.IP) ipPort {
        return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
 }
+
+func (cl *Client) ListenAddrs() (ret []net.Addr) {
+       cl.eachListener(func(l socket) bool {
+               ret = append(ret, l.Addr())
+               return true
+       })
+       return
+}
index c19129f90d064d777b18d3edd6e567ed528d706f..abeb953aafaecf4cbf6a1acdf17b3b08e173bd78 100644 (file)
@@ -13,6 +13,7 @@ import (
        "testing"
        "time"
 
+       "github.com/anacrolix/dht"
        _ "github.com/anacrolix/envpprof"
        "github.com/anacrolix/missinggo"
        "github.com/anacrolix/missinggo/filecache"
@@ -214,16 +215,6 @@ func TestUTPRawConn(t *testing.T) {
        }
 }
 
-func TestTwoClientsArbitraryPorts(t *testing.T) {
-       for i := 0; i < 2; i++ {
-               cl, err := NewClient(TestingConfig())
-               if err != nil {
-                       t.Fatal(err)
-               }
-               defer cl.Close()
-       }
-}
-
 func TestAddDropManyTorrents(t *testing.T) {
        cl, err := NewClient(TestingConfig())
        require.NoError(t, err)
@@ -403,7 +394,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        require.NoError(t, err)
        assert.True(t, new)
        // Now do some things with leecher and seeder.
-       addClientPeer(leecherTorrent, seeder)
+       leecherTorrent.AddClientPeer(seeder)
        // The Torrent should not be interested in obtaining peers, so the one we
        // just added should be the only one.
        assert.False(t, leecherTorrent.Seeding())
@@ -461,6 +452,7 @@ func TestSeedAfterDownloading(t *testing.T) {
        require.NoError(t, err)
        defer os.RemoveAll(cfg.DataDir)
        leecherLeecher, _ := NewClient(cfg)
+       require.NoError(t, err)
        defer leecherLeecher.Close()
        testutil.ExportStatusWriter(leecherLeecher, "ll")
        leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
@@ -486,8 +478,8 @@ func TestSeedAfterDownloading(t *testing.T) {
                require.NoError(t, err)
                assert.EqualValues(t, testutil.GreetingFileContents, b)
        }()
-       addClientPeer(leecherGreeting, seeder)
-       addClientPeer(leecherGreeting, leecherLeecher)
+       leecherGreeting.AddClientPeer(seeder)
+       leecherGreeting.AddClientPeer(leecherLeecher)
        wg.Add(1)
        go func() {
                defer wg.Done()
@@ -585,7 +577,7 @@ func TestResponsive(t *testing.T) {
                ret.ChunkSize = 2
                return
        }())
-       addClientPeer(leecherTorrent, seeder)
+       leecherTorrent.AddClientPeer(seeder)
        reader := leecherTorrent.NewReader()
        defer reader.Close()
        reader.SetReadahead(0)
@@ -628,7 +620,7 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
                ret.ChunkSize = 2
                return
        }())
-       addClientPeer(leecherTorrent, seeder)
+       leecherTorrent.AddClientPeer(seeder)
        reader := leecherTorrent.NewReader()
        defer reader.Close()
        reader.SetReadahead(0)
@@ -656,7 +648,12 @@ func TestDHTInheritBlocklist(t *testing.T) {
        cl, err := NewClient(cfg)
        require.NoError(t, err)
        defer cl.Close()
-       require.Equal(t, ipl, cl.DHT().IPBlocklist())
+       numServers := 0
+       cl.eachDhtServer(func(s *dht.Server) {
+               assert.Equal(t, ipl, s.IPBlocklist())
+               numServers++
+       })
+       assert.EqualValues(t, 2, numServers)
 }
 
 // Check that stuff is merged in subsequent AddTorrentSpec for the same
@@ -761,21 +758,29 @@ func TestAddMetainfoWithNodes(t *testing.T) {
        cfg := TestingConfig()
        cfg.ListenAddr = ":0"
        cfg.NoDHT = false
+       cfg.DhtStartingNodes = func() ([]dht.Addr, error) { return nil, nil }
        // For now, we want to just jam the nodes into the table, without
        // verifying them first. Also the DHT code doesn't support mixing secure
        // and insecure nodes if security is enabled (yet).
-       cfg.DHTConfig.NoSecurity = true
+       // cfg.DHTConfig.NoSecurity = true
        cl, err := NewClient(cfg)
        require.NoError(t, err)
        defer cl.Close()
-       assert.EqualValues(t, 0, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
+       sum := func() (ret int) {
+               cl.eachDhtServer(func(s *dht.Server) {
+                       ret += s.NumNodes()
+                       ret += s.Stats().OutstandingTransactions
+               })
+               return
+       }
+       assert.EqualValues(t, 0, sum())
        tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
        require.NoError(t, err)
        // Nodes are not added or exposed in Torrent's metainfo. We just randomly
        // check if the announce-list is here instead. TODO: Add nodes.
        assert.Len(t, tt.metainfo.AnnounceList, 5)
        // There are 6 nodes in the torrent file.
-       assert.EqualValues(t, 6, cl.DHT().NumNodes()+cl.DHT().Stats().OutstandingTransactions)
+       assert.EqualValues(t, 6*len(cl.dhtServers), sum())
 }
 
 type testDownloadCancelParams struct {
@@ -831,7 +836,7 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
        }
        leecherGreeting.cl.mu.Unlock()
 
-       addClientPeer(leecherGreeting, seeder)
+       leecherGreeting.AddClientPeer(seeder)
        completes := make(map[int]bool, 3)
 values:
        for {
@@ -908,8 +913,12 @@ func TestClientDynamicListenPortAllProtocols(t *testing.T) {
        cl, err := NewClient(TestingConfig())
        require.NoError(t, err)
        defer cl.Close()
-       assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
-       assert.Equal(t, missinggo.AddrPort(cl.utpSock.Addr()), missinggo.AddrPort(cl.tcpListener.Addr()))
+       port := cl.LocalPort()
+       assert.NotEqual(t, 0, port)
+       cl.eachListener(func(s socket) bool {
+               assert.Equal(t, port, missinggo.AddrPort(s.Addr()))
+               return true
+       })
 }
 
 func TestClientDynamicListenTCPOnly(t *testing.T) {
@@ -918,8 +927,11 @@ func TestClientDynamicListenTCPOnly(t *testing.T) {
        cl, err := NewClient(cfg)
        require.NoError(t, err)
        defer cl.Close()
-       assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
-       assert.Nil(t, cl.utpSock)
+       assert.NotEqual(t, 0, cl.LocalPort())
+       cl.eachListener(func(s socket) bool {
+               assert.True(t, isTcpNetwork(s.Addr().Network()))
+               return true
+       })
 }
 
 func TestClientDynamicListenUTPOnly(t *testing.T) {
@@ -928,8 +940,11 @@ func TestClientDynamicListenUTPOnly(t *testing.T) {
        cl, err := NewClient(cfg)
        require.NoError(t, err)
        defer cl.Close()
-       assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr()))
-       assert.Nil(t, cl.tcpListener)
+       assert.NotEqual(t, 0, cl.LocalPort())
+       cl.eachListener(func(s socket) bool {
+               assert.True(t, isUtpNetwork(s.Addr().Network()))
+               return true
+       })
 }
 
 func TestClientDynamicListenPortNoProtocols(t *testing.T) {
@@ -939,16 +954,7 @@ func TestClientDynamicListenPortNoProtocols(t *testing.T) {
        cl, err := NewClient(cfg)
        require.NoError(t, err)
        defer cl.Close()
-       assert.Nil(t, cl.ListenAddr())
-}
-
-func addClientPeer(t *Torrent, cl *Client) {
-       t.AddPeers([]Peer{
-               {
-                       IP:   missinggo.AddrIP(cl.ListenAddr()),
-                       Port: missinggo.AddrPort(cl.ListenAddr()),
-               },
-       })
+       assert.Equal(t, 0, cl.LocalPort())
 }
 
 func totalConns(tts []*Torrent) (ret int) {
@@ -978,7 +984,7 @@ func TestSetMaxEstablishedConn(t *testing.T) {
                for _, tt := range tts {
                        for _, _tt := range tts {
                                // if tt != _tt {
-                               addClientPeer(tt, _tt.cl)
+                               tt.AddClientPeer(_tt.cl)
                                // }
                        }
                }
@@ -1048,10 +1054,7 @@ func TestMultipleTorrentsWithEncryption(t *testing.T) {
        testutil.ExportStatusWriter(client, "c")
        tr, err := client.AddMagnet(magnet1)
        require.NoError(t, err)
-       tr.AddPeers([]Peer{{
-               IP:   missinggo.AddrIP(server.ListenAddr()),
-               Port: missinggo.AddrPort(server.ListenAddr()),
-       }})
+       tr.AddClientPeer(server)
        <-tr.GotInfo()
        tr.DownloadAll()
        client.WaitAll()
index c943bea80f6cc7886b353424663a977db41c29a2..536f7abff308df7cab39fcbe70418bb11333c798 100644 (file)
@@ -28,7 +28,9 @@ func main() {
                cl.WriteStatus(w)
        })
        http.HandleFunc("/dht", func(w http.ResponseWriter, r *http.Request) {
-               cl.DHT().WriteStatus(w)
+               for _, ds := range cl.DhtServers() {
+                       ds.WriteStatus(w)
+               }
        })
        wg := sync.WaitGroup{}
        for _, arg := range args.Magnet {
index d4dba2237a06c5fffe005a89d421bddb93d45776..52547590c9aab3280db4b320bcc443919ef1a876 100644 (file)
@@ -13,7 +13,6 @@ import (
 
        "github.com/anacrolix/torrent/iplist"
 
-       "github.com/anacrolix/dht"
        "github.com/anacrolix/envpprof"
        "github.com/anacrolix/tagflag"
        "github.com/dustin/go-humanize"
@@ -148,9 +147,6 @@ func main() {
        log.SetFlags(log.LstdFlags | log.Lshortfile)
        tagflag.Parse(&flags)
        clientConfig := torrent.Config{
-               DHTConfig: dht.ServerConfig{
-                       StartingNodes: dht.GlobalBootstrapAddrs,
-               },
                Debug: flags.Debug,
                Seed:  flags.Seed,
        }
index ed3d94bf9599b8cf548941d3e3950803b30ad4cd..4d9119da970347bb8e3bf772eeaa552672590bab 100644 (file)
@@ -15,7 +15,6 @@ import (
 
        "bazil.org/fuse"
        fusefs "bazil.org/fuse/fs"
-       "github.com/anacrolix/dht"
        _ "github.com/anacrolix/envpprof"
        "github.com/anacrolix/tagflag"
 
@@ -92,9 +91,6 @@ func mainExitCode() int {
                DisableTrackers: args.DisableTrackers,
                ListenAddr:      args.ListenAddr.String(),
                NoUpload:        true, // Ensure that downloads are responsive.
-               DHTConfig: dht.ServerConfig{
-                       StartingNodes: dht.GlobalBootstrapAddrs,
-               },
        })
        if err != nil {
                log.Print(err)
index d0f09669d9d4e1ad5ef45ff163cebb46cd1e92a8..c2aec9e36ba7299d76f1df64bc1e728709cb89d2 100644 (file)
--- a/config.go
+++ b/config.go
@@ -6,9 +6,9 @@ import (
        "net/http"
        "time"
 
-       "github.com/anacrolix/dht"
        "golang.org/x/time/rate"
 
+       "github.com/anacrolix/dht"
        "github.com/anacrolix/torrent/iplist"
        "github.com/anacrolix/torrent/storage"
 )
@@ -38,11 +38,10 @@ type Config struct {
        // Don't announce to trackers. This only leaves DHT to discover peers.
        DisableTrackers bool `long:"disable-trackers"`
        DisablePEX      bool `long:"disable-pex"`
-       // Don't create a DHT.
-       NoDHT bool `long:"disable-dht"`
-       // Overrides the default DHT configuration.
-       DHTConfig dht.ServerConfig
 
+       // Don't create a DHT.
+       NoDHT            bool `long:"disable-dht"`
+       DhtStartingNodes dht.StartingNodesGetter
        // Never send chunks to peers.
        NoUpload bool `long:"no-upload"`
        // Disable uploading even when it isn't fair.
@@ -148,6 +147,9 @@ func (cfg *Config) setDefaults() {
        if cfg.HandshakesTimeout == 0 {
                cfg.HandshakesTimeout = 20 * time.Second
        }
+       if cfg.DhtStartingNodes == nil {
+               cfg.DhtStartingNodes = dht.GlobalBootstrapAddrs
+       }
 }
 
 type EncryptionPolicy struct {
index 6f63945d16e20ec18e5a41b023896f12b4884eda..73c4710f1f6a5d8ace8d2a4781fd2c0e5ced422a 100644 (file)
@@ -13,6 +13,7 @@ import (
        "sync"
        "time"
 
+       "github.com/anacrolix/dht"
        "github.com/anacrolix/log"
 
        "github.com/anacrolix/missinggo"
@@ -48,7 +49,6 @@ type connection struct {
        headerEncrypted bool
        cryptoMethod    mse.CryptoMethod
        Discovery       peerSource
-       uTP             bool
        closed          missinggo.Event
 
        stats ConnStats
@@ -179,12 +179,16 @@ func (cn *connection) connectionFlags() (ret string) {
                c('e')
        }
        ret += string(cn.Discovery)
-       if cn.uTP {
+       if cn.utp() {
                c('U')
        }
        return
 }
 
+func (cn *connection) utp() bool {
+       return strings.Contains(cn.remoteAddr().Network(), "utp")
+}
+
 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
 func (cn *connection) statusFlags() (ret string) {
        c := func(b byte) {
@@ -1019,9 +1023,6 @@ func (c *connection) mainReadLoop() (err error) {
                case pp.Extended:
                        err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
                case pp.Port:
-                       if cl.dHT == nil {
-                               break
-                       }
                        pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
                        if err != nil {
                                panic(err)
@@ -1029,7 +1030,9 @@ func (c *connection) mainReadLoop() (err error) {
                        if msg.Port != 0 {
                                pingAddr.Port = int(msg.Port)
                        }
-                       go cl.dHT.Ping(pingAddr, nil)
+                       cl.eachDhtServer(func(s *dht.Server) {
+                               go s.Ping(pingAddr, nil)
+                       })
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
index 06f4c8888a372bcf19cbd5eef527e3836a13f832..164a8feca23d3ed178672bf6731e4be06b08b512 100644 (file)
@@ -197,13 +197,9 @@ func TestDownloadOnDemand(t *testing.T) {
        require.NoError(t, err)
        testutil.ExportStatusWriter(leecher, "l")
        defer leecher.Close()
-       leecherTorrent, _ := leecher.AddTorrent(layout.Metainfo)
-       leecherTorrent.AddPeers([]torrent.Peer{
-               {
-                       IP:   missinggo.AddrIP(seeder.ListenAddr()),
-                       Port: missinggo.AddrPort(seeder.ListenAddr()),
-               },
-       })
+       leecherTorrent, err := leecher.AddTorrent(layout.Metainfo)
+       require.NoError(t, err)
+       leecherTorrent.AddClientPeer(seeder)
        fs := New(leecher)
        defer fs.Destroy()
        root, _ := fs.Root()
diff --git a/listen.go b/listen.go
new file mode 100644 (file)
index 0000000..68c4b76
--- /dev/null
+++ b/listen.go
@@ -0,0 +1,17 @@
+package torrent
+
+type peerNetworks struct {
+       tcp4, tcp6 bool
+       utp4, utp6 bool
+}
+
+func handleErr(h func(), fs ...func() error) error {
+       for _, f := range fs {
+               err := f()
+               if err != nil {
+                       h()
+                       return err
+               }
+       }
+       return nil
+}
diff --git a/misc.go b/misc.go
index c23910ea5aaafb332b5d5f67bb6a69156d577a20..6fdb77c0d34d5c3d42ac0d2571ff3fcd2d5453a4 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -125,8 +125,10 @@ func addrCompactIP(addr net.Addr) (string, error) {
        return string(ip.To16()), nil
 }
 
-func connIsIpv6(nc net.Conn) bool {
-       ra := nc.RemoteAddr()
+func connIsIpv6(nc interface {
+       LocalAddr() net.Addr
+}) bool {
+       ra := nc.LocalAddr()
        rip := missinggo.AddrIP(ra)
        return rip.To4() == nil && rip.To16() != nil
 }
diff --git a/socket.go b/socket.go
new file mode 100644 (file)
index 0000000..7c14e3f
--- /dev/null
+++ b/socket.go
@@ -0,0 +1,123 @@
+package torrent
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "strconv"
+       "strings"
+
+       "github.com/anacrolix/missinggo"
+)
+
+type dialer interface {
+       dial(_ context.Context, addr string) (net.Conn, error)
+}
+
+type socket interface {
+       net.Listener
+       dialer
+}
+
+func listen(network, addr string) (socket, error) {
+       if isTcpNetwork(network) {
+               return listenTcp(network, addr)
+       } else if isUtpNetwork(network) {
+               return listenUtp(network, addr)
+       } else {
+               panic(fmt.Sprintf("unknown network %q", network))
+       }
+}
+
+func isTcpNetwork(s string) bool {
+       return strings.Contains(s, "tcp")
+}
+
+func isUtpNetwork(s string) bool {
+       return strings.Contains(s, "utp") || strings.Contains(s, "udp")
+}
+
+func listenTcp(network, address string) (s socket, err error) {
+       l, err := net.Listen(network, address)
+       if err != nil {
+               return
+       }
+       return tcpSocket{l}, nil
+}
+
+type tcpSocket struct {
+       net.Listener
+}
+
+func (me tcpSocket) dial(ctx context.Context, addr string) (net.Conn, error) {
+       return net.Dial(me.Addr().Network(), addr)
+}
+
+func setPort(addr string, port int) string {
+       host, _, err := net.SplitHostPort(addr)
+       if err != nil {
+               panic(err)
+       }
+       return net.JoinHostPort(host, strconv.FormatInt(int64(port), 10))
+}
+
+func listenAll(networks []string, addr string) ([]socket, error) {
+       if len(networks) == 0 {
+               return nil, nil
+       }
+       for {
+               ss, retry, err := listenAllRetry(networks, addr)
+               if !retry {
+                       return ss, err
+               }
+       }
+}
+
+func listenAllRetry(networks []string, addr string) (ss []socket, retry bool, err error) {
+       _, port, err := missinggo.ParseHostPort(addr)
+       if err != nil {
+               err = fmt.Errorf("error parsing addr: %s", err)
+               return
+       }
+       ss = make([]socket, 1, len(networks))
+       ss[0], err = listen(networks[0], addr)
+       if err != nil {
+               return nil, false, fmt.Errorf("first listen: %s", err)
+       }
+       defer func() {
+               if err != nil || retry {
+                       for _, s := range ss {
+                               s.Close()
+                       }
+                       ss = nil
+               }
+       }()
+       restAddr := setPort(addr, missinggo.AddrPort(ss[0].Addr()))
+       for _, n := range networks[1:] {
+               s, err := listen(n, restAddr)
+               if err != nil {
+                       return ss,
+                               missinggo.IsAddrInUse(err) && port == 0,
+                               fmt.Errorf("subsequent listen: %s", err)
+               }
+               ss = append(ss, s)
+       }
+       return
+}
+
+func listenUtp(network, addr string) (s socket, err error) {
+       us, err := NewUtpSocket(network, addr)
+       if err != nil {
+               return
+       }
+       return utpSocketSocket{us, network}, nil
+}
+
+type utpSocketSocket struct {
+       utpSocket
+       network string
+}
+
+func (me utpSocketSocket) dial(ctx context.Context, addr string) (net.Conn, error) {
+       return me.utpSocket.DialContext(ctx, me.network, addr)
+}
index 2387efb5413a7ee1594778d5e689e333f21832ce..83b6546b7a3f4f168f695a419959069c0a886e8d 100644 (file)
@@ -1378,9 +1378,9 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
        }
 }
 
-func (t *Torrent) announceDHT(impliedPort bool) (err error) {
+func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) {
        cl := t.cl
-       ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
+       ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
        if err != nil {
                return
        }
@@ -1389,7 +1389,7 @@ func (t *Torrent) announceDHT(impliedPort bool) (err error) {
        return
 }
 
-func (t *Torrent) dhtAnnouncer() {
+func (t *Torrent) dhtAnnouncer(s *dht.Server) {
        cl := t.cl
        for {
                select {
@@ -1397,7 +1397,7 @@ func (t *Torrent) dhtAnnouncer() {
                case <-t.closed.LockedChan(&cl.mu):
                        return
                }
-               err := t.announceDHT(true)
+               err := t.announceDHT(true, s)
                func() {
                        cl.mu.Lock()
                        defer cl.mu.Unlock()
@@ -1724,3 +1724,15 @@ func (t *Torrent) initiateConn(peer Peer) {
        t.halfOpen[addr] = peer
        go t.cl.outgoingConnection(t, addr, peer.Source)
 }
+
+func (t *Torrent) AddClientPeer(cl *Client) {
+       t.AddPeers(func() (ps []Peer) {
+               for _, la := range cl.ListenAddrs() {
+                       ps = append(ps, Peer{
+                               IP:   missinggo.AddrIP(la),
+                               Port: missinggo.AddrPort(la),
+                       })
+               }
+               return
+       }())
+}
index 43887e165dbda74816727fca42a662d2e9f7fa72..c9ea5e42841f8834d1a7882b422f4ed3b95301d3 100644 (file)
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+       "fmt"
        "net"
        "os"
        "path/filepath"
@@ -176,7 +177,7 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) {
        assert.Nil(t, tt.Metainfo().InfoBytes)
        assert.False(t, tt.haveAllMetadataPieces())
 
-       nc, err := net.Dial("tcp", cl.ListenAddr().String())
+       nc, err := net.Dial("tcp", fmt.Sprintf(":%d", cl.LocalPort()))
        require.NoError(t, err)
        defer nc.Close()