mathRand "math/rand"
"net"
"os"
- "strconv"
"strings"
"sync"
"syscall"
"github.com/h2so5/utp"
+ "github.com/anacrolix/libtorgo/bencode"
"github.com/anacrolix/libtorgo/metainfo"
- "github.com/nsf/libtorgo/bencode"
"bitbucket.org/anacrolix/go.torrent/dht"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
return t.Data.ReadAt(p, off)
}
-func dhtAddr(listen net.Addr) (s string, err error) {
- host, port, err := net.SplitHostPort(listen.String())
- if err != nil {
- return
- }
- i64, err := strconv.ParseInt(port, 0, 0)
- if err != nil {
- return
- }
- s = net.JoinHostPort(host, strconv.FormatInt(i64+1, 10))
- return
-}
-
func NewClient(cfg *Config) (cl *Client, err error) {
if cfg == nil {
cfg = &Config{}
}
return cfg.ListenAddr
}
- var l net.Listener
- if false {
+ if !cfg.DisableTCP {
+ var l net.Listener
l, err = net.Listen("tcp", listenAddr())
if err != nil {
return
cl.listeners = append(cl.listeners, l)
go cl.acceptConnections(l, false)
}
- if true {
- l, err = utp.Listen("utp", listenAddr())
+ var utpL *utp.UTPListener
+ if !cfg.DisableUTP {
+ utpL, err = utp.Listen("utp", listenAddr())
if err != nil {
return
}
- cl.listeners = append(cl.listeners, l)
- go cl.acceptConnections(l, true)
+ cl.listeners = append(cl.listeners, utpL)
+ go cl.acceptConnections(utpL, true)
}
if !cfg.NoDHT {
- var dhtAddr_ string
- dhtAddr_, err = dhtAddr(cl.ListenAddr())
- if err != nil {
- return
- }
cl.dHT, err = dht.NewServer(&dht.ServerConfig{
- Addr: dhtAddr_,
+ Addr: listenAddr(),
+ Conn: utpL.RawConn,
})
if err != nil {
return
return nil
}
+type dialResult struct {
+ net.Conn
+ UTP bool
+}
+
+func doDial(dial func() (net.Conn, error), ch chan dialResult, utp bool) {
+ conn, err := dial()
+ ch <- dialResult{conn, utp}
+ if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+ return
+ }
+ if netOpErr, ok := err.(*net.OpError); ok {
+ switch netOpErr.Err {
+ case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
+ return
+ }
+ }
+ if err != nil {
+ log.Printf("error connecting to peer: %s %#v", err, err)
+ return
+ }
+}
+
// Start the process of connecting to the given peer for the given torrent if
// appropriate.
func (me *Client) initiateConn(peer Peer, t *torrent) {
}
t.HalfOpen[addr] = struct{}{}
go func() {
- // Binding to the listener address and dialing via net.Dialer gives
+ // Binding to the listen address and dialing via net.Dialer gives
// "address in use" error. It seems it's not possible to dial out from
// this address so that peers associate our local address with our
// listen address.
- var (
- conn net.Conn
- err error
- )
- if false {
- conn, err = net.DialTimeout("tcp", addr, dialTimeout)
- } else {
- conn, err = (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
- }
+ // Initiate connections via TCP and UTP simultaneously. Use the first
+ // one that succeeds.
+ left := 2
+ resCh := make(chan dialResult, left)
+ go doDial(func() (net.Conn, error) {
+ time.Sleep(time.Second) // Give uTP a bit of a head start.
+ return net.DialTimeout("tcp", addr, dialTimeout)
+ }, resCh, false)
+ go doDial(func() (net.Conn, error) {
+ return (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
+ }, resCh, true)
+
+ var res dialResult
+ for ; left > 0 && res.Conn == nil; left-- {
+ res = <-resCh
+ }
// Whether or not the connection attempt succeeds, the half open
// counter should be decremented, and new connection attempts made.
go func() {
delete(t.HalfOpen, addr)
me.openNewConns()
}()
-
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+ if res.Conn == nil {
return
}
- if netOpErr, ok := err.(*net.OpError); ok {
- switch netOpErr.Err {
- case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
- return
- }
- }
- if err != nil {
- log.Printf("error connecting to peer: %s %#v", err, err)
- return
+ if left > 0 {
+ go func() {
+ for ; left > 0; left-- {
+ conn := (<-resCh).Conn
+ if conn != nil {
+ conn.Close()
+ }
+ }
+ }()
}
+
// log.Printf("connected to %s", conn.RemoteAddr())
- err = me.runConnection(conn, t, peer.Source, true)
+ err := me.runConnection(res.Conn, t, peer.Source, res.UTP)
if err != nil {
log.Print(err)
}
err = fmt.Errorf("error decoding extended message payload: %s", err)
break
}
- // log.Printf("got handshake: %v", d)
+ // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
if reqq, ok := d["reqq"]; ok {
if i, ok := reqq.(int64); ok {
c.PeerMaxRequests = int(i)
if t == nil {
return errors.New("no such torrent")
}
+ // for _, p := range peers {
+ // log.Printf("adding peer for %q: %s", infoHash, p)
+ // }
t.AddPeers(peers)
me.openNewConns()
return nil
go me.announceTorrent(t)
}
if me.dHT != nil {
- go me.announceTorrentDHT(t)
+ go me.announceTorrentDHT(t, true)
}
return
}
return me.AddTorrent(mi)
}
-func (cl *Client) announceTorrentDHT(t *torrent) {
+func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
for {
ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
if err != nil {
if port != 0 {
// We can't allow the port to be implied as long as the UTP and
// DHT ports are different.
- cl.dHT.AnnouncePeer(port, false, t.InfoHash.AsString())
+ cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
}
}
}