]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add UTP support, disable TCP for now. DHT moves to another port
authorMatt Joiner <anacrolix@gmail.com>
Sun, 16 Nov 2014 19:29:31 +0000 (13:29 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 16 Nov 2014 19:29:31 +0000 (13:29 -0600)
NOTES [new file with mode: 0644]
client.go
connection.go

diff --git a/NOTES b/NOTES
new file mode 100644 (file)
index 0000000..956240f
--- /dev/null
+++ b/NOTES
@@ -0,0 +1,6 @@
+Shared DHT/UTP/UDP-tracker socket dispatching in Transmission is at
+https://trac.transmissionbt.com/browser/trunk/libtransmission/tr-udp.c
+event_callback(). Currently I don't do this because github.com/h2so5/utp does
+not support UTP sockets backed by a socket out of its control. Also I only
+make client requests for UDP-trackers, so no shared socket is required there
+unless I want to imply the client port.
index 017ebe7782f29a2c4ce4828acfbc587c69dbb58b..4a0bfd244d7b7cc0a463198940874cc7228a1dbc 100644 (file)
--- a/client.go
+++ b/client.go
@@ -29,22 +29,23 @@ import (
        mathRand "math/rand"
        "net"
        "os"
+       "strconv"
        "strings"
        "sync"
        "syscall"
        "time"
 
-       "bitbucket.org/anacrolix/go.torrent/util/levelmu"
-
-       "bitbucket.org/anacrolix/go.torrent/dht"
-       . "bitbucket.org/anacrolix/go.torrent/util"
+       "github.com/h2so5/utp"
 
        "github.com/anacrolix/libtorgo/metainfo"
        "github.com/nsf/libtorgo/bencode"
 
+       "bitbucket.org/anacrolix/go.torrent/dht"
        pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
        "bitbucket.org/anacrolix/go.torrent/tracker"
        _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
+       . "bitbucket.org/anacrolix/go.torrent/util"
+       "bitbucket.org/anacrolix/go.torrent/util/levelmu"
 )
 
 var (
@@ -117,7 +118,7 @@ type Client struct {
        dataDir          string
        halfOpenLimit    int
        peerID           [20]byte
-       listener         net.Listener
+       listeners        []net.Listener
        disableTrackers  bool
        downloadStrategy DownloadStrategy
        dHT              *dht.Server
@@ -153,6 +154,8 @@ func (cl *Client) WriteStatus(w io.Writer) {
        if cl.dHT != nil {
                fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
                fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
+               fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr()))
+               fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces)
        }
        cl.downloadStrategy.WriteStatus(w)
        fmt.Fprintln(w)
@@ -225,6 +228,19 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
        return t.Data.ReadAt(p, off)
 }
 
+func dhtAddr(listen net.Addr) (s string, err error) {
+       host, port, err := net.SplitHostPort(listen.String())
+       if err != nil {
+               return
+       }
+       i64, err := strconv.ParseInt(port, 0, 0)
+       if err != nil {
+               return
+       }
+       s = net.JoinHostPort(host, strconv.FormatInt(i64+1, 10))
+       return
+}
+
 func NewClient(cfg *Config) (cl *Client, err error) {
        if cfg == nil {
                cfg = &Config{}
@@ -255,17 +271,38 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                cl.downloadStrategy = &DefaultDownloadStrategy{}
        }
 
-       cl.listener, err = net.Listen("tcp", cfg.ListenAddr)
-       if err != nil {
-               return
+       // Returns the laddr string to listen on for the next Listen call.
+       listenAddr := func() string {
+               if addr := cl.ListenAddr(); addr != nil {
+                       return addr.String()
+               }
+               return cfg.ListenAddr
        }
-       if cl.listener != nil {
-               go cl.acceptConnections()
+       var l net.Listener
+       if false {
+               l, err = net.Listen("tcp", listenAddr())
+               if err != nil {
+                       return
+               }
+               cl.listeners = append(cl.listeners, l)
+               go cl.acceptConnections(l, false)
+       }
+       if true {
+               l, err = utp.Listen("utp", listenAddr())
+               if err != nil {
+                       return
+               }
+               cl.listeners = append(cl.listeners, l)
+               go cl.acceptConnections(l, true)
        }
-
        if !cfg.NoDHT {
+               var dhtAddr_ string
+               dhtAddr_, err = dhtAddr(cl.ListenAddr())
+               if err != nil {
+                       return
+               }
                cl.dHT, err = dht.NewServer(&dht.ServerConfig{
-                       Addr: cfg.ListenAddr,
+                       Addr: dhtAddr_,
                })
                if err != nil {
                        return
@@ -296,11 +333,11 @@ func (me *Client) Stop() {
        me.mu.Unlock()
 }
 
-func (cl *Client) acceptConnections() {
+func (cl *Client) acceptConnections(l net.Listener, utp bool) {
        for {
                // We accept all connections immediately, because we don't what
                // torrent they're for.
-               conn, err := cl.listener.Accept()
+               conn, err := l.Accept()
                select {
                case <-cl.quit:
                        if conn != nil {
@@ -314,7 +351,7 @@ func (cl *Client) acceptConnections() {
                        return
                }
                go func() {
-                       if err := cl.runConnection(conn, nil, peerSourceIncoming); err != nil {
+                       if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
                                log.Print(err)
                        }
                }()
@@ -332,7 +369,7 @@ func (me *Client) torrent(ih InfoHash) *torrent {
 
 // Start the process of connecting to the given peer for the given torrent if
 // appropriate.
-func (me *Client) initiateConn(peer Peer, torrent *torrent) {
+func (me *Client) initiateConn(peer Peer, t *torrent) {
        if peer.Id == me.peerID {
                return
        }
@@ -353,7 +390,11 @@ func (me *Client) initiateConn(peer Peer, torrent *torrent) {
                // "address in use" error. It seems it's not possible to dial out from
                // this address so that peers associate our local address with our
                // listen address.
-               conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
+               if false {
+                       conn, err := net.DialTimeout("tcp", addr, dialTimeout)
+               } else {
+                       conn, err := (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
+               }
 
                // Whether or not the connection attempt succeeds, the half open
                // counter should be decremented, and new connection attempts made.
@@ -381,7 +422,7 @@ func (me *Client) initiateConn(peer Peer, torrent *torrent) {
                        return
                }
                // log.Printf("connected to %s", conn.RemoteAddr())
-               err = me.runConnection(conn, torrent, peer.Source)
+               err = me.runConnection(conn, t, peer.Source, true)
                if err != nil {
                        log.Print(err)
                }
@@ -518,7 +559,7 @@ func (pc peerConn) Read(b []byte) (n int, err error) {
        return
 }
 
-func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource) (err error) {
+func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
        if tcpConn, ok := sock.(*net.TCPConn); ok {
                tcpConn.SetLinger(0)
        }
@@ -554,7 +595,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
        }
        sock.SetWriteDeadline(time.Time{})
        sock = peerConn{sock}
-       conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID)
+       conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
        defer conn.Close()
        conn.Discovery = discovery
        if !me.addConnection(torrent, conn) {
index 7a9db0f4015b0ce21cea77578a1732cd73b7141f..ddd2364d8aa16dbf093c06de86c2b907e1ba8e0d 100644 (file)
@@ -28,6 +28,7 @@ const (
 type connection struct {
        Socket    net.Conn
        Discovery peerSource
+       uTP       bool
        closing   chan struct{}
        mu        sync.Mutex // Only for closing.
        post      chan pp.Message
@@ -59,9 +60,11 @@ type connection struct {
        PeerClientName   string
 }
 
-func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte) (c *connection) {
+func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
        c = &connection{
-               Socket:             sock,
+               Socket: sock,
+               uTP:    uTP,
+
                Choked:             true,
                PeerChoked:         true,
                PeerMaxRequests:    250,
@@ -156,6 +159,9 @@ func (cn *connection) WriteStatus(w io.Writer) {
        if cn.Discovery != 0 {
                c(byte(cn.Discovery))
        }
+       if cn.uTP {
+               c('T')
+       }
        fmt.Fprintln(w)
 }