]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Connect to peers using both UTP and TCP; Share UTP port with DHT
authorMatt Joiner <anacrolix@gmail.com>
Mon, 17 Nov 2014 05:27:01 +0000 (23:27 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 17 Nov 2014 05:27:01 +0000 (23:27 -0600)
client.go
config.go
dht/dht.go

index c553afd5a1cc0af678c59918d301795124eea73e..bc20d6b20d61f8715834b5390fd2e215338c1768 100644 (file)
--- a/client.go
+++ b/client.go
@@ -29,7 +29,6 @@ import (
        mathRand "math/rand"
        "net"
        "os"
-       "strconv"
        "strings"
        "sync"
        "syscall"
@@ -37,8 +36,8 @@ import (
 
        "github.com/h2so5/utp"
 
+       "github.com/anacrolix/libtorgo/bencode"
        "github.com/anacrolix/libtorgo/metainfo"
-       "github.com/nsf/libtorgo/bencode"
 
        "bitbucket.org/anacrolix/go.torrent/dht"
        pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
@@ -231,19 +230,6 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
        return t.Data.ReadAt(p, off)
 }
 
-func dhtAddr(listen net.Addr) (s string, err error) {
-       host, port, err := net.SplitHostPort(listen.String())
-       if err != nil {
-               return
-       }
-       i64, err := strconv.ParseInt(port, 0, 0)
-       if err != nil {
-               return
-       }
-       s = net.JoinHostPort(host, strconv.FormatInt(i64+1, 10))
-       return
-}
-
 func NewClient(cfg *Config) (cl *Client, err error) {
        if cfg == nil {
                cfg = &Config{}
@@ -285,8 +271,8 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                }
                return cfg.ListenAddr
        }
-       var l net.Listener
-       if false {
+       if !cfg.DisableTCP {
+               var l net.Listener
                l, err = net.Listen("tcp", listenAddr())
                if err != nil {
                        return
@@ -294,22 +280,19 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                cl.listeners = append(cl.listeners, l)
                go cl.acceptConnections(l, false)
        }
-       if true {
-               l, err = utp.Listen("utp", listenAddr())
+       var utpL *utp.UTPListener
+       if !cfg.DisableUTP {
+               utpL, err = utp.Listen("utp", listenAddr())
                if err != nil {
                        return
                }
-               cl.listeners = append(cl.listeners, l)
-               go cl.acceptConnections(l, true)
+               cl.listeners = append(cl.listeners, utpL)
+               go cl.acceptConnections(utpL, true)
        }
        if !cfg.NoDHT {
-               var dhtAddr_ string
-               dhtAddr_, err = dhtAddr(cl.ListenAddr())
-               if err != nil {
-                       return
-               }
                cl.dHT, err = dht.NewServer(&dht.ServerConfig{
-                       Addr: dhtAddr_,
+                       Addr: listenAddr(),
+                       Conn: utpL.RawConn,
                })
                if err != nil {
                        return
@@ -374,6 +357,29 @@ func (me *Client) torrent(ih InfoHash) *torrent {
        return nil
 }
 
+type dialResult struct {
+       net.Conn
+       UTP bool
+}
+
+func doDial(dial func() (net.Conn, error), ch chan dialResult, utp bool) {
+       conn, err := dial()
+       ch <- dialResult{conn, utp}
+       if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+               return
+       }
+       if netOpErr, ok := err.(*net.OpError); ok {
+               switch netOpErr.Err {
+               case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
+                       return
+               }
+       }
+       if err != nil {
+               log.Printf("error connecting to peer: %s %#v", err, err)
+               return
+       }
+}
+
 // Start the process of connecting to the given peer for the given torrent if
 // appropriate.
 func (me *Client) initiateConn(peer Peer, t *torrent) {
@@ -387,20 +393,27 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
        }
        t.HalfOpen[addr] = struct{}{}
        go func() {
-               // Binding to the listener address and dialing via net.Dialer gives
+               // Binding to the listen address and dialing via net.Dialer gives
                // "address in use" error. It seems it's not possible to dial out from
                // this address so that peers associate our local address with our
                // listen address.
-               var (
-                       conn net.Conn
-                       err  error
-               )
-               if false {
-                       conn, err = net.DialTimeout("tcp", addr, dialTimeout)
-               } else {
-                       conn, err = (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
-               }
 
+               // Initiate connections via TCP and UTP simultaneously. Use the first
+               // one that succeeds.
+               left := 2
+               resCh := make(chan dialResult, left)
+               go doDial(func() (net.Conn, error) {
+                       time.Sleep(time.Second) // Give uTP a bit of a head start.
+                       return net.DialTimeout("tcp", addr, dialTimeout)
+               }, resCh, false)
+               go doDial(func() (net.Conn, error) {
+                       return (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
+               }, resCh, true)
+
+               var res dialResult
+               for ; left > 0 && res.Conn == nil; left-- {
+                       res = <-resCh
+               }
                // Whether or not the connection attempt succeeds, the half open
                // counter should be decremented, and new connection attempts made.
                go func() {
@@ -412,22 +425,22 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
                        delete(t.HalfOpen, addr)
                        me.openNewConns()
                }()
-
-               if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+               if res.Conn == nil {
                        return
                }
-               if netOpErr, ok := err.(*net.OpError); ok {
-                       switch netOpErr.Err {
-                       case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
-                               return
-                       }
-               }
-               if err != nil {
-                       log.Printf("error connecting to peer: %s %#v", err, err)
-                       return
+               if left > 0 {
+                       go func() {
+                               for ; left > 0; left-- {
+                                       conn := (<-resCh).Conn
+                                       if conn != nil {
+                                               conn.Close()
+                                       }
+                               }
+                       }()
                }
+
                // log.Printf("connected to %s", conn.RemoteAddr())
-               err = me.runConnection(conn, t, peer.Source, true)
+               err := me.runConnection(res.Conn, t, peer.Source, res.UTP)
                if err != nil {
                        log.Print(err)
                }
@@ -916,7 +929,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                                        err = fmt.Errorf("error decoding extended message payload: %s", err)
                                        break
                                }
-                               // log.Printf("got handshake: %v", d)
+                               // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
                                if reqq, ok := d["reqq"]; ok {
                                        if i, ok := reqq.(int64); ok {
                                                c.PeerMaxRequests = int(i)
@@ -1106,6 +1119,9 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
        if t == nil {
                return errors.New("no such torrent")
        }
+       // for _, p := range peers {
+       //      log.Printf("adding peer for %q: %s", infoHash, p)
+       // }
        t.AddPeers(peers)
        me.openNewConns()
        return nil
@@ -1239,7 +1255,7 @@ func (me *Client) addTorrent(t *torrent) (err error) {
                go me.announceTorrent(t)
        }
        if me.dHT != nil {
-               go me.announceTorrentDHT(t)
+               go me.announceTorrentDHT(t, true)
        }
        return
 }
@@ -1274,7 +1290,7 @@ func (me *Client) AddTorrentFromFile(name string) (err error) {
        return me.AddTorrent(mi)
 }
 
-func (cl *Client) announceTorrentDHT(t *torrent) {
+func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
        for {
                ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
                if err != nil {
@@ -1322,7 +1338,7 @@ func (cl *Client) announceTorrentDHT(t *torrent) {
                if port != 0 {
                        // We can't allow the port to be implied as long as the UTP and
                        // DHT ports are different.
-                       cl.dHT.AnnouncePeer(port, false, t.InfoHash.AsString())
+                       cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
                }
        }
 }
index 3b3b08378ff4ccab15ade5b3edfc3e5a8c169f96..06c35761ca455ba8ce1195110f1c3b46dfffe922 100644 (file)
--- a/config.go
+++ b/config.go
@@ -8,4 +8,6 @@ type Config struct {
        NoDHT            bool
        NoUpload         bool
        PeerID           string
+       DisableUTP       bool
+       DisableTCP       bool
 }
index 90169ff46e9c87914965bd2164cb2aa9c6b0fb0a..6e5d29cb7d02ea366df0e3b2d6866cadb325cf66 100644 (file)
@@ -41,6 +41,7 @@ func newDHTAddr(addr *net.UDPAddr) (ret dHTAddr) {
 
 type ServerConfig struct {
        Addr string
+       Conn net.PacketConn
 }
 
 func (s *Server) LocalAddr() net.Addr {
@@ -61,9 +62,13 @@ func NewServer(c *ServerConfig) (s *Server, err error) {
                c = &ServerConfig{}
        }
        s = &Server{}
-       s.socket, err = makeSocket(c.Addr)
-       if err != nil {
-               return
+       if c.Conn != nil {
+               s.socket = c.Conn
+       } else {
+               s.socket, err = makeSocket(c.Addr)
+               if err != nil {
+                       return
+               }
        }
        err = s.init()
        if err != nil {
@@ -234,6 +239,7 @@ func (s *Server) serve() error {
                        }
                        continue
                }
+               // log.Printf("received from %s: %#v", addr_, d)
                addr := newDHTAddr(addr_.(*net.UDPAddr))
                s.mu.Lock()
                if d["y"] == "q" {