acceptedConns = expvar.NewInt("acceptedConns")
inboundConnsBlocked = expvar.NewInt("inboundConnsBlocked")
peerExtensions = expvar.NewMap("peerExtensions")
+ // Count of connections to peer with same client ID.
+ connsToSelf = expvar.NewInt("connsToSelf")
+ // Number of completed connections to a client we're already connected with.
+ duplicateClientConns = expvar.NewInt("duplicateClientConns")
+ receivedMessageTypes = expvar.NewMap("receivedMessageTypes")
+ supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
)
const (
// Limit how long handshake can take. This is to reduce the lingering
// impact of a few bad apples. 4s loses 1% of successful handshakes that
// are obtained with 60s timeout, and 5% of unsuccessful handshakes.
- handshakeTimeout = 45 * time.Second
+ btHandshakeTimeout = 4 * time.Second
+ handshakesTimeout = 20 * time.Second
pruneInterval = 10 * time.Second
)
event sync.Cond
quit chan struct{}
- handshaking int
-
torrents map[InfoHash]*torrent
}
fmt.Fprintln(w, "Not listening!")
}
fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
- fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
if cl.dHT != nil {
dhtStats := cl.dHT.Stats()
fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.NumNodes, dhtStats.NumGoodNodes)
}
// Sets priorities to download from the given offset. Returns when the piece
-// at the given offset can be read. Returns the number of bytes that
+// at the given offset can be read. Returns the number of bytes that are
// immediately available from the offset.
func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
index := int(off / int64(t.usualPieceSize()))
panic(fmt.Sprintf("can't read from %T", d))
}
+// Calculates the number of pieces to set to Readahead priority, after the
+// Now, and Next pieces.
func readaheadPieces(readahead, pieceLength int64) int {
return int((readahead+pieceLength-1)/pieceLength - 1)
}
return
}
cl.raisePiecePriority(t, index, piecePriorityNext)
- for i := 0; i < readaheadPieces(5*1024*1024, t.Info.PieceLength); i++ {
+ for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) {
index++
if index >= t.numPieces() {
break
func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
if t.Pieces[piece].Priority < priority {
+ cl.event.Broadcast()
cl.prioritizePiece(t, piece, priority)
}
}
conn.Close()
continue
}
- go func() {
- if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
- log.Print(err)
- }
- }()
+ go cl.incomingConnection(conn, utp)
+ }
+}
+
+func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
+ defer nc.Close()
+ if tc, ok := nc.(*net.TCPConn); ok {
+ tc.SetLinger(0)
+ }
+ c := newConnection()
+ c.conn = nc
+ c.rw = nc
+ c.Discovery = peerSourceIncoming
+ c.uTP = utp
+ err := cl.runReceivedConn(c)
+ if err != nil {
+ log.Print(err)
+ }
+}
+
+func (cl *Client) Torrent(ih InfoHash) (T Torrent, ok bool) {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ t, ok := cl.torrents[ih]
+ if !ok {
+ return
}
+ T = Torrent{cl, t}
+ return
}
func (me *Client) torrent(ih InfoHash) *torrent {
}
type dialResult struct {
- net.Conn
- UTP bool
+ Conn net.Conn
+ UTP bool
}
-func doDial(dial func(addr string) (net.Conn, error), ch chan dialResult, utp bool, addr string) {
- conn, err := dial(addr)
+func doDial(dial func(addr string, t *torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *torrent) {
+ conn, err := dial(addr, t)
if err != nil {
if conn != nil {
conn.Close()
log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
return
}
- dialTimeout := reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
t.HalfOpen[addr] = struct{}{}
- go func() {
- // Binding to the listen address and dialing via net.Dialer gives
- // "address in use" error. It seems it's not possible to dial out from
- // this address so that peers associate our local address with our
- // listen address.
-
- // Initiate connections via TCP and UTP simultaneously. Use the first
- // one that succeeds.
- left := 0
- if !me.disableUTP {
- left++
- }
- if !me.disableTCP {
- left++
- }
- resCh := make(chan dialResult, left)
- if !me.disableUTP {
- go doDial(func(addr string) (net.Conn, error) {
- return me.utpSock.DialTimeout(addr, dialTimeout)
- }, resCh, true, addr)
- }
- if !me.disableTCP {
- go doDial(func(addr string) (net.Conn, error) {
- // time.Sleep(time.Second) // Give uTP a bit of a head start.
- return net.DialTimeout("tcp", addr, dialTimeout)
- }, resCh, false, addr)
- }
- var res dialResult
- for ; left > 0 && res.Conn == nil; left-- {
- res = <-resCh
- }
- // Whether or not the connection attempt succeeds, the half open
- // counter should be decremented, and new connection attempts made.
+ go me.outgoingConnection(t, addr, peer.Source)
+}
+
+func (me *Client) dialTimeout(t *torrent) time.Duration {
+ return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
+}
+
+func (me *Client) dialTCP(addr string, t *torrent) (c net.Conn, err error) {
+ c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
+ if err == nil {
+ c.(*net.TCPConn).SetLinger(0)
+ }
+ return
+}
+
+func (me *Client) dialUTP(addr string, t *torrent) (c net.Conn, err error) {
+ return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
+}
+
+// Returns a connection over UTP or TCP.
+func (me *Client) dial(addr string, t *torrent) (conn net.Conn, utp bool) {
+ // Initiate connections via TCP and UTP simultaneously. Use the first one
+ // that succeeds.
+ left := 0
+ if !me.disableUTP {
+ left++
+ }
+ if !me.disableTCP {
+ left++
+ }
+ resCh := make(chan dialResult, left)
+ if !me.disableUTP {
+ go doDial(me.dialUTP, resCh, true, addr, t)
+ }
+ if !me.disableTCP {
+ go doDial(me.dialTCP, resCh, false, addr, t)
+ }
+ var res dialResult
+ // Wait for a successful connection.
+ for ; left > 0 && res.Conn == nil; left-- {
+ res = <-resCh
+ }
+ if left > 0 {
+ // There are still incompleted dials.
go func() {
- me.mu.Lock()
- defer me.mu.Unlock()
- if _, ok := t.HalfOpen[addr]; !ok {
- panic("invariant broken")
+ for ; left > 0; left-- {
+ conn := (<-resCh).Conn
+ if conn != nil {
+ conn.Close()
+ }
}
- delete(t.HalfOpen, addr)
- me.openNewConns(t)
}()
- if res.Conn == nil {
+ }
+ conn = res.Conn
+ utp = res.UTP
+ return
+}
+
+func (me *Client) noLongerHalfOpen(t *torrent, addr string) {
+ if _, ok := t.HalfOpen[addr]; !ok {
+ panic("invariant broken")
+ }
+ delete(t.HalfOpen, addr)
+ me.openNewConns(t)
+}
+
+// Returns nil connection and nil error if no connection could be established
+// for valid reasons.
+func (me *Client) establishOutgoingConn(t *torrent, addr string) (c *connection, err error) {
+ handshakesConnection := func(nc net.Conn, encrypted, utp bool) (c *connection, err error) {
+ c = newConnection()
+ c.conn = nc
+ c.rw = nc
+ c.encrypted = encrypted
+ c.uTP = utp
+ err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
+ if err != nil {
return
}
- if left > 0 {
- go func() {
- for ; left > 0; left-- {
- conn := (<-resCh).Conn
- if conn != nil {
- conn.Close()
- }
- }
- }()
+ ok, err := me.initiateHandshakes(c, t)
+ if !ok {
+ c = nil
}
+ return
+ }
+ nc, utp := me.dial(addr, t)
+ if nc == nil {
+ return
+ }
+ c, err = handshakesConnection(nc, true, utp)
+ if err != nil {
+ nc.Close()
+ return
+ } else if c != nil {
+ return
+ }
+ nc.Close()
+ if utp {
+ nc, err = me.dialUTP(addr, t)
+ } else {
+ nc, err = me.dialTCP(addr, t)
+ }
+ if err != nil {
+ err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
+ return
+ }
+ c, err = handshakesConnection(nc, false, utp)
+ if err != nil {
+ nc.Close()
+ }
+ return
+}
- // log.Printf("connected to %s", conn.RemoteAddr())
- err := me.runConnection(res.Conn, t, peer.Source, res.UTP)
- if err != nil {
- log.Print(err)
- }
- }()
+// Called to dial out and run a connection. The addr we're given is already
+// considered half-open.
+func (me *Client) outgoingConnection(t *torrent, addr string, ps peerSource) {
+ c, err := me.establishOutgoingConn(t, addr)
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ // Don't release lock between here and addConnection, unless it's for
+ // failure.
+ me.noLongerHalfOpen(t, addr)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ if c == nil {
+ return
+ }
+ defer c.Close()
+ c.Discovery = ps
+ err = me.runInitiatedHandshookConn(c, t)
+ if err != nil {
+ log.Print(err)
+ }
}
// The port number for incoming peer connections. 0 if the client isn't
// Wait until writes complete before returning from handshake.
err = <-writeDone
if err != nil {
- err = fmt.Errorf("error writing during handshake: %s", err)
+ err = fmt.Errorf("error writing: %s", err)
}
}()
// Wraps a raw connection and provides the interface we want for using the
// connection in the message loop.
-type peerConn struct {
- net.Conn
+type deadlineReader struct {
+ nc net.Conn
+ r io.Reader
}
-func (pc peerConn) Read(b []byte) (n int, err error) {
+func (me deadlineReader) Read(b []byte) (n int, err error) {
// Keep-alives should be received every 2 mins. Give a bit of gracetime.
- err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
+ err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
if err != nil {
err = fmt.Errorf("error setting read deadline: %s", err)
}
- n, err = pc.Conn.Read(b)
+ n, err = me.r.Read(b)
// Convert common errors into io.EOF.
+ // if err != nil {
+ // if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
+ // err = io.EOF
+ // } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+ // if n != 0 {
+ // panic(n)
+ // }
+ // err = io.EOF
+ // }
+ // }
+ return
+}
+
+type readWriter struct {
+ io.Reader
+ io.Writer
+}
+
+func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
+ var protocol [len(pp.Protocol)]byte
+ _, err = io.ReadFull(rw, protocol[:])
if err != nil {
- if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
- err = io.EOF
- } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
- if n != 0 {
- panic(n)
- }
- err = io.EOF
+ return
+ }
+ ret = readWriter{
+ io.MultiReader(bytes.NewReader(protocol[:]), rw),
+ rw,
+ }
+ if string(protocol[:]) == pp.Protocol {
+ return
+ }
+ encrypted = true
+ ret, err = mse.ReceiveHandshake(ret, skeys)
+ return
+}
+
+func (cl *Client) receiveSkeys() (ret [][]byte) {
+ for ih := range cl.torrents {
+ ret = append(ret, ih[:])
+ }
+ return
+}
+
+func (me *Client) initiateHandshakes(c *connection, t *torrent) (ok bool, err error) {
+ if c.encrypted {
+ c.rw, err = mse.InitiateHandshake(c.rw, t.InfoHash[:], nil)
+ if err != nil {
+ return
}
}
+ ih, ok, err := me.connBTHandshake(c, &t.InfoHash)
+ if ih != t.InfoHash {
+ ok = false
+ }
return
}
-func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
- if tcpConn, ok := sock.(*net.TCPConn); ok {
- tcpConn.SetLinger(0)
+func (cl *Client) receiveHandshakes(c *connection) (t *torrent, err error) {
+ cl.mu.Lock()
+ skeys := cl.receiveSkeys()
+ cl.mu.Unlock()
+ // TODO: Filter unmatching skey errors.
+ c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
+ if err != nil {
+ if err == mse.ErrNoSecretKeyMatch {
+ err = nil
+ }
+ return
}
- defer sock.Close()
- err = sock.SetDeadline(time.Now().Add(handshakeTimeout))
+ ih, ok, err := cl.connBTHandshake(c, nil)
if err != nil {
- err = fmt.Errorf("couldn't set handshake deadline: %s", err)
+ fmt.Errorf("error during bt handshake: %s", err)
return
}
- me.mu.Lock()
- me.handshaking++
- me.mu.Unlock()
- var rw io.ReadWriter = sock
- if torrent == nil {
- rw, err = mse.ReceiveHandshake(sock, func() (ret [][]byte) {
- for ih := range me.torrents {
- ret = append(ret, ih[:])
- }
- return
- }())
- } else {
- rw, err = mse.InitiateHandshake(sock, torrent.InfoHash[:])
+ if !ok {
+ return
}
- if err != nil {
- err = fmt.Errorf("error during MSE handshake: %s", err)
+ cl.mu.Lock()
+ t = cl.torrents[ih]
+ cl.mu.Unlock()
+ return
+}
+
+// Returns !ok if handshake failed for valid reasons.
+func (cl *Client) connBTHandshake(c *connection, ih *InfoHash) (ret InfoHash, ok bool, err error) {
+ res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
+ if err != nil || !ok {
return
}
- hsRes, ok, err := handshake(rw, func() *InfoHash {
- if torrent == nil {
- return nil
- } else {
- return &torrent.InfoHash
- }
- }(), me.peerID, me.extensionBytes)
- me.mu.Lock()
- defer me.mu.Unlock()
- if me.handshaking == 0 {
- panic("handshake count invariant is broken")
+ ret = res.InfoHash
+ c.PeerExtensionBytes = res.peerExtensionBytes
+ c.PeerID = res.peerID
+ c.completedHandshake = time.Now()
+ return
+}
+
+func (cl *Client) runInitiatedHandshookConn(c *connection, t *torrent) (err error) {
+ if c.PeerID == cl.peerID {
+ // Only if we initiated the connection is the remote address a
+ // listen addr for a doppleganger.
+ connsToSelf.Add(1)
+ addr := c.conn.RemoteAddr().String()
+ cl.dopplegangerAddrs[addr] = struct{}{}
+ return
}
- me.handshaking--
+ return cl.runHandshookConn(c, t)
+}
+
+func (cl *Client) runReceivedConn(c *connection) (err error) {
+ err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
if err != nil {
- err = fmt.Errorf("error during handshake: %s", err)
return
}
- if !ok {
+ t, err := cl.receiveHandshakes(c)
+ if err != nil {
+ logonce.Stderr.Printf("error receiving handshakes: %s", err)
+ err = nil
return
}
- if hsRes.peerID == me.peerID {
+ if t == nil {
return
}
- torrent = me.torrent(hsRes.InfoHash)
- if torrent == nil {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ if c.PeerID == cl.peerID {
return
}
- sock.SetWriteDeadline(time.Time{})
- sock = peerConn{sock}
- conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP, rw)
- defer conn.Close()
- conn.Discovery = discovery
- if !me.addConnection(torrent, conn) {
+ return cl.runHandshookConn(c, t)
+}
+
+func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) {
+ c.conn.SetWriteDeadline(time.Time{})
+ c.rw = readWriter{
+ deadlineReader{c.conn, c.rw},
+ c.rw,
+ }
+ if !cl.addConnection(t, c) {
return
}
+ defer cl.dropConnection(t, c)
+ go c.writer()
+ go c.writeOptimizer(time.Minute)
+ cl.sendInitialMessages(c, t)
+ if t.haveInfo() {
+ t.initRequestOrdering(c)
+ }
+ err = cl.connectionLoop(t, c)
+ if err != nil {
+ err = fmt.Errorf("error during connection loop: %s", err)
+ }
+ return
+}
+
+func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) {
if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
conn.Post(pp.Message{
Type: pp.Extended,
Port: uint16(AddrPort(me.dHT.LocalAddr())),
})
}
- if torrent.haveInfo() {
- torrent.initRequestOrdering(conn)
- }
- err = me.connectionLoop(torrent, conn)
- if err != nil {
- err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
- }
- me.dropConnection(torrent, conn)
- return
}
+// Randomizes the piece order for this connection. Every connection will be
+// given a different ordering. Having it stored per connection saves having to
+// randomize during request filling, and constantly recalculate the ordering
+// based on piece priorities.
func (t *torrent) initRequestOrdering(c *connection) {
if c.pieceRequestOrder != nil || c.piecePriorities != nil {
panic("double init of request ordering")
}
c.piecePriorities = mathRand.Perm(t.numPieces())
c.pieceRequestOrder = pieceordering.New()
- for i := 0; i < t.numPieces(); i++ {
+ for i := range iter.N(t.Info.NumPieces()) {
if !c.PeerHasPiece(i) {
continue
}
// TODO: I'm sure there's something here to do with seeding.
func (t *torrent) badConn(c *connection) bool {
- if time.Now().Sub(c.completedHandshake) < 30*time.Second {
+ // A 30 second grace for initial messages to go through.
+ if time.Since(c.completedHandshake) < 30*time.Second {
return false
}
if !t.haveInfo() {
}
func (me *Client) wantConns(t *torrent) bool {
- if !t.needData() && me.noUpload {
+ if me.noUpload && !t.needData() {
return false
}
if t.numGoodConns() >= socketsPerTorrent {
me.openNewConns(t)
}
-func (cl *Client) torrentFileCachePath(ih InfoHash) string {
+func (cl *Client) cachedMetaInfoFilename(ih InfoHash) string {
return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
}
func (cl *Client) saveTorrentFile(t *torrent) error {
- path := cl.torrentFileCachePath(t.InfoHash)
+ path := cl.cachedMetaInfoFilename(t.InfoHash)
os.MkdirAll(filepath.Dir(path), 0777)
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
// Prepare a Torrent without any attachment to a Client. That means we can
// initialize fields all fields that don't require the Client without locking
// it.
-func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
+func newTorrent(ih InfoHash) (t *torrent, err error) {
t = &torrent{
InfoHash: ih,
Peers: make(map[peersKey]Peer),
}
t.wantPeers.L = &t.stateMu
t.GotMetainfo = t.gotMetainfo
- t.addTrackers(announceList)
return
}
}
func (t Torrent) Drop() {
+ t.cl.mu.Lock()
t.cl.dropTorrent(t.InfoHash)
+ t.cl.mu.Unlock()
}
type File struct {
return me.cl.torrentReadAt(me.torrent, off, p)
}
-// Returns nil metainfo if it isn't in the cache.
+// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
+// metainfo has the correct infohash.
func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
if cl.config.DisableMetainfoCache {
return
}
- f, err := os.Open(cl.torrentFileCachePath(ih))
+ f, err := os.Open(cl.cachedMetaInfoFilename(ih))
if err != nil {
if os.IsNotExist(err) {
err = nil