From 963918ac90ac4c91758dfdba7bdb0c1cc4d8eaa7 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 16 Nov 2014 13:29:31 -0600 Subject: [PATCH] Add UTP support, disable TCP for now. DHT moves to another port --- NOTES | 6 ++++ client.go | 81 ++++++++++++++++++++++++++++++++++++++------------- connection.go | 10 +++++-- 3 files changed, 75 insertions(+), 22 deletions(-) create mode 100644 NOTES diff --git a/NOTES b/NOTES new file mode 100644 index 00000000..956240f2 --- /dev/null +++ b/NOTES @@ -0,0 +1,6 @@ +Shared DHT/UTP/UDP-tracker socket dispatching in Transmission is at +https://trac.transmissionbt.com/browser/trunk/libtransmission/tr-udp.c +event_callback(). Currently I don't do this because github.com/h2so5/utp does +not support UTP sockets backed by a socket out of its control. Also I only +make client requests for UDP-trackers, so no shared socket is required there +unless I want to imply the client port. diff --git a/client.go b/client.go index 017ebe77..4a0bfd24 100644 --- a/client.go +++ b/client.go @@ -29,22 +29,23 @@ import ( mathRand "math/rand" "net" "os" + "strconv" "strings" "sync" "syscall" "time" - "bitbucket.org/anacrolix/go.torrent/util/levelmu" - - "bitbucket.org/anacrolix/go.torrent/dht" - . "bitbucket.org/anacrolix/go.torrent/util" + "github.com/h2so5/utp" "github.com/anacrolix/libtorgo/metainfo" "github.com/nsf/libtorgo/bencode" + "bitbucket.org/anacrolix/go.torrent/dht" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" "bitbucket.org/anacrolix/go.torrent/tracker" _ "bitbucket.org/anacrolix/go.torrent/tracker/udp" + . "bitbucket.org/anacrolix/go.torrent/util" + "bitbucket.org/anacrolix/go.torrent/util/levelmu" ) var ( @@ -117,7 +118,7 @@ type Client struct { dataDir string halfOpenLimit int peerID [20]byte - listener net.Listener + listeners []net.Listener disableTrackers bool downloadStrategy DownloadStrategy dHT *dht.Server @@ -153,6 +154,8 @@ func (cl *Client) WriteStatus(w io.Writer) { if cl.dHT != nil { fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes()) fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString()) + fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr())) + fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces) } cl.downloadStrategy.WriteStatus(w) fmt.Fprintln(w) @@ -225,6 +228,19 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er return t.Data.ReadAt(p, off) } +func dhtAddr(listen net.Addr) (s string, err error) { + host, port, err := net.SplitHostPort(listen.String()) + if err != nil { + return + } + i64, err := strconv.ParseInt(port, 0, 0) + if err != nil { + return + } + s = net.JoinHostPort(host, strconv.FormatInt(i64+1, 10)) + return +} + func NewClient(cfg *Config) (cl *Client, err error) { if cfg == nil { cfg = &Config{} @@ -255,17 +271,38 @@ func NewClient(cfg *Config) (cl *Client, err error) { cl.downloadStrategy = &DefaultDownloadStrategy{} } - cl.listener, err = net.Listen("tcp", cfg.ListenAddr) - if err != nil { - return + // Returns the laddr string to listen on for the next Listen call. + listenAddr := func() string { + if addr := cl.ListenAddr(); addr != nil { + return addr.String() + } + return cfg.ListenAddr } - if cl.listener != nil { - go cl.acceptConnections() + var l net.Listener + if false { + l, err = net.Listen("tcp", listenAddr()) + if err != nil { + return + } + cl.listeners = append(cl.listeners, l) + go cl.acceptConnections(l, false) + } + if true { + l, err = utp.Listen("utp", listenAddr()) + if err != nil { + return + } + cl.listeners = append(cl.listeners, l) + go cl.acceptConnections(l, true) } - if !cfg.NoDHT { + var dhtAddr_ string + dhtAddr_, err = dhtAddr(cl.ListenAddr()) + if err != nil { + return + } cl.dHT, err = dht.NewServer(&dht.ServerConfig{ - Addr: cfg.ListenAddr, + Addr: dhtAddr_, }) if err != nil { return @@ -296,11 +333,11 @@ func (me *Client) Stop() { me.mu.Unlock() } -func (cl *Client) acceptConnections() { +func (cl *Client) acceptConnections(l net.Listener, utp bool) { for { // We accept all connections immediately, because we don't what // torrent they're for. - conn, err := cl.listener.Accept() + conn, err := l.Accept() select { case <-cl.quit: if conn != nil { @@ -314,7 +351,7 @@ func (cl *Client) acceptConnections() { return } go func() { - if err := cl.runConnection(conn, nil, peerSourceIncoming); err != nil { + if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil { log.Print(err) } }() @@ -332,7 +369,7 @@ func (me *Client) torrent(ih InfoHash) *torrent { // Start the process of connecting to the given peer for the given torrent if // appropriate. -func (me *Client) initiateConn(peer Peer, torrent *torrent) { +func (me *Client) initiateConn(peer Peer, t *torrent) { if peer.Id == me.peerID { return } @@ -353,7 +390,11 @@ func (me *Client) initiateConn(peer Peer, torrent *torrent) { // "address in use" error. It seems it's not possible to dial out from // this address so that peers associate our local address with our // listen address. - conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout) + if false { + conn, err := net.DialTimeout("tcp", addr, dialTimeout) + } else { + conn, err := (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr) + } // Whether or not the connection attempt succeeds, the half open // counter should be decremented, and new connection attempts made. @@ -381,7 +422,7 @@ func (me *Client) initiateConn(peer Peer, torrent *torrent) { return } // log.Printf("connected to %s", conn.RemoteAddr()) - err = me.runConnection(conn, torrent, peer.Source) + err = me.runConnection(conn, t, peer.Source, true) if err != nil { log.Print(err) } @@ -518,7 +559,7 @@ func (pc peerConn) Read(b []byte) (n int, err error) { return } -func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource) (err error) { +func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) { if tcpConn, ok := sock.(*net.TCPConn); ok { tcpConn.SetLinger(0) } @@ -554,7 +595,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS } sock.SetWriteDeadline(time.Time{}) sock = peerConn{sock} - conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID) + conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP) defer conn.Close() conn.Discovery = discovery if !me.addConnection(torrent, conn) { diff --git a/connection.go b/connection.go index 7a9db0f4..ddd2364d 100644 --- a/connection.go +++ b/connection.go @@ -28,6 +28,7 @@ const ( type connection struct { Socket net.Conn Discovery peerSource + uTP bool closing chan struct{} mu sync.Mutex // Only for closing. post chan pp.Message @@ -59,9 +60,11 @@ type connection struct { PeerClientName string } -func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte) (c *connection) { +func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) { c = &connection{ - Socket: sock, + Socket: sock, + uTP: uTP, + Choked: true, PeerChoked: true, PeerMaxRequests: 250, @@ -156,6 +159,9 @@ func (cn *connection) WriteStatus(w io.Writer) { if cn.Discovery != 0 { c(byte(cn.Discovery)) } + if cn.uTP { + c('T') + } fmt.Fprintln(w) } -- 2.48.1