From: Matt Joiner Date: Mon, 17 Nov 2014 05:27:01 +0000 (-0600) Subject: Connect to peers using both UTP and TCP; Share UTP port with DHT X-Git-Tag: v1.0.0~1531 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=8d96195b0cff9cd1a22da3843c85a48059d51e06;p=btrtrc.git Connect to peers using both UTP and TCP; Share UTP port with DHT --- diff --git a/client.go b/client.go index c553afd5..bc20d6b2 100644 --- a/client.go +++ b/client.go @@ -29,7 +29,6 @@ import ( mathRand "math/rand" "net" "os" - "strconv" "strings" "sync" "syscall" @@ -37,8 +36,8 @@ import ( "github.com/h2so5/utp" + "github.com/anacrolix/libtorgo/bencode" "github.com/anacrolix/libtorgo/metainfo" - "github.com/nsf/libtorgo/bencode" "bitbucket.org/anacrolix/go.torrent/dht" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" @@ -231,19 +230,6 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er return t.Data.ReadAt(p, off) } -func dhtAddr(listen net.Addr) (s string, err error) { - host, port, err := net.SplitHostPort(listen.String()) - if err != nil { - return - } - i64, err := strconv.ParseInt(port, 0, 0) - if err != nil { - return - } - s = net.JoinHostPort(host, strconv.FormatInt(i64+1, 10)) - return -} - func NewClient(cfg *Config) (cl *Client, err error) { if cfg == nil { cfg = &Config{} @@ -285,8 +271,8 @@ func NewClient(cfg *Config) (cl *Client, err error) { } return cfg.ListenAddr } - var l net.Listener - if false { + if !cfg.DisableTCP { + var l net.Listener l, err = net.Listen("tcp", listenAddr()) if err != nil { return @@ -294,22 +280,19 @@ func NewClient(cfg *Config) (cl *Client, err error) { cl.listeners = append(cl.listeners, l) go cl.acceptConnections(l, false) } - if true { - l, err = utp.Listen("utp", listenAddr()) + var utpL *utp.UTPListener + if !cfg.DisableUTP { + utpL, err = utp.Listen("utp", listenAddr()) if err != nil { return } - cl.listeners = append(cl.listeners, l) - go cl.acceptConnections(l, true) + cl.listeners = append(cl.listeners, utpL) + go cl.acceptConnections(utpL, true) } if !cfg.NoDHT { - var dhtAddr_ string - dhtAddr_, err = dhtAddr(cl.ListenAddr()) - if err != nil { - return - } cl.dHT, err = dht.NewServer(&dht.ServerConfig{ - Addr: dhtAddr_, + Addr: listenAddr(), + Conn: utpL.RawConn, }) if err != nil { return @@ -374,6 +357,29 @@ func (me *Client) torrent(ih InfoHash) *torrent { return nil } +type dialResult struct { + net.Conn + UTP bool +} + +func doDial(dial func() (net.Conn, error), ch chan dialResult, utp bool) { + conn, err := dial() + ch <- dialResult{conn, utp} + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + return + } + if netOpErr, ok := err.(*net.OpError); ok { + switch netOpErr.Err { + case syscall.ECONNREFUSED, syscall.EHOSTUNREACH: + return + } + } + if err != nil { + log.Printf("error connecting to peer: %s %#v", err, err) + return + } +} + // Start the process of connecting to the given peer for the given torrent if // appropriate. func (me *Client) initiateConn(peer Peer, t *torrent) { @@ -387,20 +393,27 @@ func (me *Client) initiateConn(peer Peer, t *torrent) { } t.HalfOpen[addr] = struct{}{} go func() { - // Binding to the listener address and dialing via net.Dialer gives + // Binding to the listen address and dialing via net.Dialer gives // "address in use" error. It seems it's not possible to dial out from // this address so that peers associate our local address with our // listen address. - var ( - conn net.Conn - err error - ) - if false { - conn, err = net.DialTimeout("tcp", addr, dialTimeout) - } else { - conn, err = (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr) - } + // Initiate connections via TCP and UTP simultaneously. Use the first + // one that succeeds. + left := 2 + resCh := make(chan dialResult, left) + go doDial(func() (net.Conn, error) { + time.Sleep(time.Second) // Give uTP a bit of a head start. + return net.DialTimeout("tcp", addr, dialTimeout) + }, resCh, false) + go doDial(func() (net.Conn, error) { + return (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr) + }, resCh, true) + + var res dialResult + for ; left > 0 && res.Conn == nil; left-- { + res = <-resCh + } // Whether or not the connection attempt succeeds, the half open // counter should be decremented, and new connection attempts made. go func() { @@ -412,22 +425,22 @@ func (me *Client) initiateConn(peer Peer, t *torrent) { delete(t.HalfOpen, addr) me.openNewConns() }() - - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + if res.Conn == nil { return } - if netOpErr, ok := err.(*net.OpError); ok { - switch netOpErr.Err { - case syscall.ECONNREFUSED, syscall.EHOSTUNREACH: - return - } - } - if err != nil { - log.Printf("error connecting to peer: %s %#v", err, err) - return + if left > 0 { + go func() { + for ; left > 0; left-- { + conn := (<-resCh).Conn + if conn != nil { + conn.Close() + } + } + }() } + // log.Printf("connected to %s", conn.RemoteAddr()) - err = me.runConnection(conn, t, peer.Source, true) + err := me.runConnection(res.Conn, t, peer.Source, res.UTP) if err != nil { log.Print(err) } @@ -916,7 +929,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { err = fmt.Errorf("error decoding extended message payload: %s", err) break } - // log.Printf("got handshake: %v", d) + // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d) if reqq, ok := d["reqq"]; ok { if i, ok := reqq.(int64); ok { c.PeerMaxRequests = int(i) @@ -1106,6 +1119,9 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { if t == nil { return errors.New("no such torrent") } + // for _, p := range peers { + // log.Printf("adding peer for %q: %s", infoHash, p) + // } t.AddPeers(peers) me.openNewConns() return nil @@ -1239,7 +1255,7 @@ func (me *Client) addTorrent(t *torrent) (err error) { go me.announceTorrent(t) } if me.dHT != nil { - go me.announceTorrentDHT(t) + go me.announceTorrentDHT(t, true) } return } @@ -1274,7 +1290,7 @@ func (me *Client) AddTorrentFromFile(name string) (err error) { return me.AddTorrent(mi) } -func (cl *Client) announceTorrentDHT(t *torrent) { +func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { for { ps, err := cl.dHT.GetPeers(string(t.InfoHash[:])) if err != nil { @@ -1322,7 +1338,7 @@ func (cl *Client) announceTorrentDHT(t *torrent) { if port != 0 { // We can't allow the port to be implied as long as the UTP and // DHT ports are different. - cl.dHT.AnnouncePeer(port, false, t.InfoHash.AsString()) + cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString()) } } } diff --git a/config.go b/config.go index 3b3b0837..06c35761 100644 --- a/config.go +++ b/config.go @@ -8,4 +8,6 @@ type Config struct { NoDHT bool NoUpload bool PeerID string + DisableUTP bool + DisableTCP bool } diff --git a/dht/dht.go b/dht/dht.go index 90169ff4..6e5d29cb 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -41,6 +41,7 @@ func newDHTAddr(addr *net.UDPAddr) (ret dHTAddr) { type ServerConfig struct { Addr string + Conn net.PacketConn } func (s *Server) LocalAddr() net.Addr { @@ -61,9 +62,13 @@ func NewServer(c *ServerConfig) (s *Server, err error) { c = &ServerConfig{} } s = &Server{} - s.socket, err = makeSocket(c.Addr) - if err != nil { - return + if c.Conn != nil { + s.socket = c.Conn + } else { + s.socket, err = makeSocket(c.Addr) + if err != nil { + return + } } err = s.init() if err != nil { @@ -234,6 +239,7 @@ func (s *Server) serve() error { } continue } + // log.Printf("received from %s: %#v", addr_, d) addr := newDHTAddr(addr_.(*net.UDPAddr)) s.mu.Lock() if d["y"] == "q" {