]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rewrite handshaking and connection management
authorMatt Joiner <anacrolix@gmail.com>
Wed, 18 Mar 2015 07:28:13 +0000 (18:28 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 18 Mar 2015 07:28:13 +0000 (18:28 +1100)
client.go
connection.go
torrent.go
torrent_test.go

index ff335e40714fe31642087a7d5848efbb7111d434..ee238db934274a9111b5616e683822e9f00a2039 100644 (file)
--- a/client.go
+++ b/client.go
@@ -75,6 +75,12 @@ var (
        acceptedConns               = expvar.NewInt("acceptedConns")
        inboundConnsBlocked         = expvar.NewInt("inboundConnsBlocked")
        peerExtensions              = expvar.NewMap("peerExtensions")
+       // Count of connections to peer with same client ID.
+       connsToSelf = expvar.NewInt("connsToSelf")
+       // Number of completed connections to a client we're already connected with.
+       duplicateClientConns       = expvar.NewInt("duplicateClientConns")
+       receivedMessageTypes       = expvar.NewMap("receivedMessageTypes")
+       supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
 )
 
 const (
@@ -94,7 +100,8 @@ const (
        // Limit how long handshake can take. This is to reduce the lingering
        // impact of a few bad apples. 4s loses 1% of successful handshakes that
        // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
-       handshakeTimeout = 45 * time.Second
+       btHandshakeTimeout = 4 * time.Second
+       handshakesTimeout  = 20 * time.Second
 
        pruneInterval = 10 * time.Second
 )
@@ -143,8 +150,6 @@ type Client struct {
        event sync.Cond
        quit  chan struct{}
 
-       handshaking int
-
        torrents map[InfoHash]*torrent
 }
 
@@ -218,7 +223,6 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                fmt.Fprintln(w, "Not listening!")
        }
        fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
-       fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
        if cl.dHT != nil {
                dhtStats := cl.dHT.Stats()
                fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.NumNodes, dhtStats.NumGoodNodes)
@@ -277,7 +281,7 @@ func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err err
 }
 
 // Sets priorities to download from the given offset. Returns when the piece
-// at the given offset can be read. Returns the number of bytes that
+// at the given offset can be read. Returns the number of bytes that are
 // immediately available from the offset.
 func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
        index := int(off / int64(t.usualPieceSize()))
@@ -328,6 +332,8 @@ again:
        panic(fmt.Sprintf("can't read from %T", d))
 }
 
+// Calculates the number of pieces to set to Readahead priority, after the
+// Now, and Next pieces.
 func readaheadPieces(readahead, pieceLength int64) int {
        return int((readahead+pieceLength-1)/pieceLength - 1)
 }
@@ -340,7 +346,7 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
                return
        }
        cl.raisePiecePriority(t, index, piecePriorityNext)
-       for i := 0; i < readaheadPieces(5*1024*1024, t.Info.PieceLength); i++ {
+       for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) {
                index++
                if index >= t.numPieces() {
                        break
@@ -366,6 +372,7 @@ func (t *torrent) connPendPiece(c *connection, piece int) {
 
 func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
        if t.Pieces[piece].Priority < priority {
+               cl.event.Broadcast()
                cl.prioritizePiece(t, piece, priority)
        }
 }
@@ -624,12 +631,35 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) {
                        conn.Close()
                        continue
                }
-               go func() {
-                       if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
-                               log.Print(err)
-                       }
-               }()
+               go cl.incomingConnection(conn, utp)
+       }
+}
+
+func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
+       defer nc.Close()
+       if tc, ok := nc.(*net.TCPConn); ok {
+               tc.SetLinger(0)
+       }
+       c := newConnection()
+       c.conn = nc
+       c.rw = nc
+       c.Discovery = peerSourceIncoming
+       c.uTP = utp
+       err := cl.runReceivedConn(c)
+       if err != nil {
+               log.Print(err)
+       }
+}
+
+func (cl *Client) Torrent(ih InfoHash) (T Torrent, ok bool) {
+       cl.mu.Lock()
+       defer cl.mu.Unlock()
+       t, ok := cl.torrents[ih]
+       if !ok {
+               return
        }
+       T = Torrent{cl, t}
+       return
 }
 
 func (me *Client) torrent(ih InfoHash) *torrent {
@@ -637,12 +667,12 @@ func (me *Client) torrent(ih InfoHash) *torrent {
 }
 
 type dialResult struct {
-       net.Conn
-       UTP bool
+       Conn net.Conn
+       UTP  bool
 }
 
-func doDial(dial func(addr string) (net.Conn, error), ch chan dialResult, utp bool, addr string) {
-       conn, err := dial(addr)
+func doDial(dial func(addr string, t *torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *torrent) {
+       conn, err := dial(addr, t)
        if err != nil {
                if conn != nil {
                        conn.Close()
@@ -693,70 +723,142 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
                log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
                return
        }
-       dialTimeout := reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
        t.HalfOpen[addr] = struct{}{}
-       go func() {
-               // Binding to the listen address and dialing via net.Dialer gives
-               // "address in use" error. It seems it's not possible to dial out from
-               // this address so that peers associate our local address with our
-               // listen address.
-
-               // Initiate connections via TCP and UTP simultaneously. Use the first
-               // one that succeeds.
-               left := 0
-               if !me.disableUTP {
-                       left++
-               }
-               if !me.disableTCP {
-                       left++
-               }
-               resCh := make(chan dialResult, left)
-               if !me.disableUTP {
-                       go doDial(func(addr string) (net.Conn, error) {
-                               return me.utpSock.DialTimeout(addr, dialTimeout)
-                       }, resCh, true, addr)
-               }
-               if !me.disableTCP {
-                       go doDial(func(addr string) (net.Conn, error) {
-                               // time.Sleep(time.Second) // Give uTP a bit of a head start.
-                               return net.DialTimeout("tcp", addr, dialTimeout)
-                       }, resCh, false, addr)
-               }
-               var res dialResult
-               for ; left > 0 && res.Conn == nil; left-- {
-                       res = <-resCh
-               }
-               // Whether or not the connection attempt succeeds, the half open
-               // counter should be decremented, and new connection attempts made.
+       go me.outgoingConnection(t, addr, peer.Source)
+}
+
+func (me *Client) dialTimeout(t *torrent) time.Duration {
+       return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
+}
+
+func (me *Client) dialTCP(addr string, t *torrent) (c net.Conn, err error) {
+       c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
+       if err == nil {
+               c.(*net.TCPConn).SetLinger(0)
+       }
+       return
+}
+
+func (me *Client) dialUTP(addr string, t *torrent) (c net.Conn, err error) {
+       return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
+}
+
+// Returns a connection over UTP or TCP.
+func (me *Client) dial(addr string, t *torrent) (conn net.Conn, utp bool) {
+       // Initiate connections via TCP and UTP simultaneously. Use the first one
+       // that succeeds.
+       left := 0
+       if !me.disableUTP {
+               left++
+       }
+       if !me.disableTCP {
+               left++
+       }
+       resCh := make(chan dialResult, left)
+       if !me.disableUTP {
+               go doDial(me.dialUTP, resCh, true, addr, t)
+       }
+       if !me.disableTCP {
+               go doDial(me.dialTCP, resCh, false, addr, t)
+       }
+       var res dialResult
+       // Wait for a successful connection.
+       for ; left > 0 && res.Conn == nil; left-- {
+               res = <-resCh
+       }
+       if left > 0 {
+               // There are still incompleted dials.
                go func() {
-                       me.mu.Lock()
-                       defer me.mu.Unlock()
-                       if _, ok := t.HalfOpen[addr]; !ok {
-                               panic("invariant broken")
+                       for ; left > 0; left-- {
+                               conn := (<-resCh).Conn
+                               if conn != nil {
+                                       conn.Close()
+                               }
                        }
-                       delete(t.HalfOpen, addr)
-                       me.openNewConns(t)
                }()
-               if res.Conn == nil {
+       }
+       conn = res.Conn
+       utp = res.UTP
+       return
+}
+
+func (me *Client) noLongerHalfOpen(t *torrent, addr string) {
+       if _, ok := t.HalfOpen[addr]; !ok {
+               panic("invariant broken")
+       }
+       delete(t.HalfOpen, addr)
+       me.openNewConns(t)
+}
+
+// Returns nil connection and nil error if no connection could be established
+// for valid reasons.
+func (me *Client) establishOutgoingConn(t *torrent, addr string) (c *connection, err error) {
+       handshakesConnection := func(nc net.Conn, encrypted, utp bool) (c *connection, err error) {
+               c = newConnection()
+               c.conn = nc
+               c.rw = nc
+               c.encrypted = encrypted
+               c.uTP = utp
+               err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
+               if err != nil {
                        return
                }
-               if left > 0 {
-                       go func() {
-                               for ; left > 0; left-- {
-                                       conn := (<-resCh).Conn
-                                       if conn != nil {
-                                               conn.Close()
-                                       }
-                               }
-                       }()
+               ok, err := me.initiateHandshakes(c, t)
+               if !ok {
+                       c = nil
                }
+               return
+       }
+       nc, utp := me.dial(addr, t)
+       if nc == nil {
+               return
+       }
+       c, err = handshakesConnection(nc, true, utp)
+       if err != nil {
+               nc.Close()
+               return
+       } else if c != nil {
+               return
+       }
+       nc.Close()
+       if utp {
+               nc, err = me.dialUTP(addr, t)
+       } else {
+               nc, err = me.dialTCP(addr, t)
+       }
+       if err != nil {
+               err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
+               return
+       }
+       c, err = handshakesConnection(nc, false, utp)
+       if err != nil {
+               nc.Close()
+       }
+       return
+}
 
-               // log.Printf("connected to %s", conn.RemoteAddr())
-               err := me.runConnection(res.Conn, t, peer.Source, res.UTP)
-               if err != nil {
-                       log.Print(err)
-               }
-       }()
+// Called to dial out and run a connection. The addr we're given is already
+// considered half-open.
+func (me *Client) outgoingConnection(t *torrent, addr string, ps peerSource) {
+       c, err := me.establishOutgoingConn(t, addr)
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       // Don't release lock between here and addConnection, unless it's for
+       // failure.
+       me.noLongerHalfOpen(t, addr)
+       if err != nil {
+               log.Print(err)
+               return
+       }
+       if c == nil {
+               return
+       }
+       defer c.Close()
+       c.Discovery = ps
+       err = me.runInitiatedHandshookConn(c, t)
+       if err != nil {
+               log.Print(err)
+       }
 }
 
 // The port number for incoming peer connections. 0 if the client isn't
@@ -843,7 +945,7 @@ func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions pee
                // Wait until writes complete before returning from handshake.
                err = <-writeDone
                if err != nil {
-                       err = fmt.Errorf("error writing during handshake: %s", err)
+                       err = fmt.Errorf("error writing: %s", err)
                }
        }()
 
@@ -889,94 +991,173 @@ func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions pee
 
 // Wraps a raw connection and provides the interface we want for using the
 // connection in the message loop.
-type peerConn struct {
-       net.Conn
+type deadlineReader struct {
+       nc net.Conn
+       r  io.Reader
 }
 
-func (pc peerConn) Read(b []byte) (n int, err error) {
+func (me deadlineReader) Read(b []byte) (n int, err error) {
        // Keep-alives should be received every 2 mins. Give a bit of gracetime.
-       err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
+       err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
        if err != nil {
                err = fmt.Errorf("error setting read deadline: %s", err)
        }
-       n, err = pc.Conn.Read(b)
+       n, err = me.r.Read(b)
        // Convert common errors into io.EOF.
+       // if err != nil {
+       //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
+       //              err = io.EOF
+       //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+       //              if n != 0 {
+       //                      panic(n)
+       //              }
+       //              err = io.EOF
+       //      }
+       // }
+       return
+}
+
+type readWriter struct {
+       io.Reader
+       io.Writer
+}
+
+func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
+       var protocol [len(pp.Protocol)]byte
+       _, err = io.ReadFull(rw, protocol[:])
        if err != nil {
-               if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
-                       err = io.EOF
-               } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
-                       if n != 0 {
-                               panic(n)
-                       }
-                       err = io.EOF
+               return
+       }
+       ret = readWriter{
+               io.MultiReader(bytes.NewReader(protocol[:]), rw),
+               rw,
+       }
+       if string(protocol[:]) == pp.Protocol {
+               return
+       }
+       encrypted = true
+       ret, err = mse.ReceiveHandshake(ret, skeys)
+       return
+}
+
+func (cl *Client) receiveSkeys() (ret [][]byte) {
+       for ih := range cl.torrents {
+               ret = append(ret, ih[:])
+       }
+       return
+}
+
+func (me *Client) initiateHandshakes(c *connection, t *torrent) (ok bool, err error) {
+       if c.encrypted {
+               c.rw, err = mse.InitiateHandshake(c.rw, t.InfoHash[:], nil)
+               if err != nil {
+                       return
                }
        }
+       ih, ok, err := me.connBTHandshake(c, &t.InfoHash)
+       if ih != t.InfoHash {
+               ok = false
+       }
        return
 }
 
-func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
-       if tcpConn, ok := sock.(*net.TCPConn); ok {
-               tcpConn.SetLinger(0)
+func (cl *Client) receiveHandshakes(c *connection) (t *torrent, err error) {
+       cl.mu.Lock()
+       skeys := cl.receiveSkeys()
+       cl.mu.Unlock()
+       // TODO: Filter unmatching skey errors.
+       c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
+       if err != nil {
+               if err == mse.ErrNoSecretKeyMatch {
+                       err = nil
+               }
+               return
        }
-       defer sock.Close()
-       err = sock.SetDeadline(time.Now().Add(handshakeTimeout))
+       ih, ok, err := cl.connBTHandshake(c, nil)
        if err != nil {
-               err = fmt.Errorf("couldn't set handshake deadline: %s", err)
+               fmt.Errorf("error during bt handshake: %s", err)
                return
        }
-       me.mu.Lock()
-       me.handshaking++
-       me.mu.Unlock()
-       var rw io.ReadWriter = sock
-       if torrent == nil {
-               rw, err = mse.ReceiveHandshake(sock, func() (ret [][]byte) {
-                       for ih := range me.torrents {
-                               ret = append(ret, ih[:])
-                       }
-                       return
-               }())
-       } else {
-               rw, err = mse.InitiateHandshake(sock, torrent.InfoHash[:])
+       if !ok {
+               return
        }
-       if err != nil {
-               err = fmt.Errorf("error during MSE handshake: %s", err)
+       cl.mu.Lock()
+       t = cl.torrents[ih]
+       cl.mu.Unlock()
+       return
+}
+
+// Returns !ok if handshake failed for valid reasons.
+func (cl *Client) connBTHandshake(c *connection, ih *InfoHash) (ret InfoHash, ok bool, err error) {
+       res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
+       if err != nil || !ok {
                return
        }
-       hsRes, ok, err := handshake(rw, func() *InfoHash {
-               if torrent == nil {
-                       return nil
-               } else {
-                       return &torrent.InfoHash
-               }
-       }(), me.peerID, me.extensionBytes)
-       me.mu.Lock()
-       defer me.mu.Unlock()
-       if me.handshaking == 0 {
-               panic("handshake count invariant is broken")
+       ret = res.InfoHash
+       c.PeerExtensionBytes = res.peerExtensionBytes
+       c.PeerID = res.peerID
+       c.completedHandshake = time.Now()
+       return
+}
+
+func (cl *Client) runInitiatedHandshookConn(c *connection, t *torrent) (err error) {
+       if c.PeerID == cl.peerID {
+               // Only if we initiated the connection is the remote address a
+               // listen addr for a doppleganger.
+               connsToSelf.Add(1)
+               addr := c.conn.RemoteAddr().String()
+               cl.dopplegangerAddrs[addr] = struct{}{}
+               return
        }
-       me.handshaking--
+       return cl.runHandshookConn(c, t)
+}
+
+func (cl *Client) runReceivedConn(c *connection) (err error) {
+       err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
        if err != nil {
-               err = fmt.Errorf("error during handshake: %s", err)
                return
        }
-       if !ok {
+       t, err := cl.receiveHandshakes(c)
+       if err != nil {
+               logonce.Stderr.Printf("error receiving handshakes: %s", err)
+               err = nil
                return
        }
-       if hsRes.peerID == me.peerID {
+       if t == nil {
                return
        }
-       torrent = me.torrent(hsRes.InfoHash)
-       if torrent == nil {
+       cl.mu.Lock()
+       defer cl.mu.Unlock()
+       if c.PeerID == cl.peerID {
                return
        }
-       sock.SetWriteDeadline(time.Time{})
-       sock = peerConn{sock}
-       conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP, rw)
-       defer conn.Close()
-       conn.Discovery = discovery
-       if !me.addConnection(torrent, conn) {
+       return cl.runHandshookConn(c, t)
+}
+
+func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) {
+       c.conn.SetWriteDeadline(time.Time{})
+       c.rw = readWriter{
+               deadlineReader{c.conn, c.rw},
+               c.rw,
+       }
+       if !cl.addConnection(t, c) {
                return
        }
+       defer cl.dropConnection(t, c)
+       go c.writer()
+       go c.writeOptimizer(time.Minute)
+       cl.sendInitialMessages(c, t)
+       if t.haveInfo() {
+               t.initRequestOrdering(c)
+       }
+       err = cl.connectionLoop(t, c)
+       if err != nil {
+               err = fmt.Errorf("error during connection loop: %s", err)
+       }
+       return
+}
+
+func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) {
        if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
                conn.Post(pp.Message{
                        Type:       pp.Extended,
@@ -1035,24 +1216,19 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
                        Port: uint16(AddrPort(me.dHT.LocalAddr())),
                })
        }
-       if torrent.haveInfo() {
-               torrent.initRequestOrdering(conn)
-       }
-       err = me.connectionLoop(torrent, conn)
-       if err != nil {
-               err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
-       }
-       me.dropConnection(torrent, conn)
-       return
 }
 
+// Randomizes the piece order for this connection. Every connection will be
+// given a different ordering. Having it stored per connection saves having to
+// randomize during request filling, and constantly recalculate the ordering
+// based on piece priorities.
 func (t *torrent) initRequestOrdering(c *connection) {
        if c.pieceRequestOrder != nil || c.piecePriorities != nil {
                panic("double init of request ordering")
        }
        c.piecePriorities = mathRand.Perm(t.numPieces())
        c.pieceRequestOrder = pieceordering.New()
-       for i := 0; i < t.numPieces(); i++ {
+       for i := range iter.N(t.Info.NumPieces()) {
                if !c.PeerHasPiece(i) {
                        continue
                }
@@ -1527,7 +1703,8 @@ func (t *torrent) needData() bool {
 
 // TODO: I'm sure there's something here to do with seeding.
 func (t *torrent) badConn(c *connection) bool {
-       if time.Now().Sub(c.completedHandshake) < 30*time.Second {
+       // A 30 second grace for initial messages to go through.
+       if time.Since(c.completedHandshake) < 30*time.Second {
                return false
        }
        if !t.haveInfo() {
@@ -1546,7 +1723,7 @@ func (t *torrent) numGoodConns() (num int) {
 }
 
 func (me *Client) wantConns(t *torrent) bool {
-       if !t.needData() && me.noUpload {
+       if me.noUpload && !t.needData() {
                return false
        }
        if t.numGoodConns() >= socketsPerTorrent {
@@ -1596,12 +1773,12 @@ func (me *Client) addPeers(t *torrent, peers []Peer) {
        me.openNewConns(t)
 }
 
-func (cl *Client) torrentFileCachePath(ih InfoHash) string {
+func (cl *Client) cachedMetaInfoFilename(ih InfoHash) string {
        return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
 }
 
 func (cl *Client) saveTorrentFile(t *torrent) error {
-       path := cl.torrentFileCachePath(t.InfoHash)
+       path := cl.cachedMetaInfoFilename(t.InfoHash)
        os.MkdirAll(filepath.Dir(path), 0777)
        f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
        if err != nil {
@@ -1682,7 +1859,7 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
 // Prepare a Torrent without any attachment to a Client. That means we can
 // initialize fields all fields that don't require the Client without locking
 // it.
-func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
+func newTorrent(ih InfoHash) (t *torrent, err error) {
        t = &torrent{
                InfoHash: ih,
                Peers:    make(map[peersKey]Peer),
@@ -1696,7 +1873,6 @@ func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *tor
        }
        t.wantPeers.L = &t.stateMu
        t.GotMetainfo = t.gotMetainfo
-       t.addTrackers(announceList)
        return
 }
 
@@ -1763,7 +1939,9 @@ func (t Torrent) NumPieces() int {
 }
 
 func (t Torrent) Drop() {
+       t.cl.mu.Lock()
        t.cl.dropTorrent(t.InfoHash)
+       t.cl.mu.Unlock()
 }
 
 type File struct {
@@ -1959,12 +2137,13 @@ func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
        return me.cl.torrentReadAt(me.torrent, off, p)
 }
 
-// Returns nil metainfo if it isn't in the cache.
+// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
+// metainfo has the correct infohash.
 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
        if cl.config.DisableMetainfoCache {
                return
        }
-       f, err := os.Open(cl.torrentFileCachePath(ih))
+       f, err := os.Open(cl.cachedMetaInfoFilename(ih))
        if err != nil {
                if os.IsNotExist(err) {
                        err = nil
index 75a1dba351dbc7e46f59abe8a913ad0fb458359e..16e9db05924988ad015bdadc190df0bcfc52f970 100644 (file)
@@ -30,6 +30,7 @@ const (
 type connection struct {
        conn      net.Conn
        rw        io.ReadWriter // The real slim shady
+       encrypted bool
        Discovery peerSource
        uTP       bool
        closing   chan struct{}
@@ -37,7 +38,9 @@ type connection struct {
        post      chan pp.Message
        writeCh   chan []byte
 
-       piecePriorities   []int
+       // The connections preferred order to download pieces.
+       piecePriorities []int
+       // The piece request order based on piece priorities.
        pieceRequestOrder *pieceordering.Instance
 
        UnwantedChunksReceived int
@@ -69,26 +72,16 @@ type connection struct {
        PeerClientName   string
 }
 
-func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool, rw io.ReadWriter) (c *connection) {
+func newConnection() (c *connection) {
        c = &connection{
-               conn: sock,
-               rw:   rw,
-               uTP:  uTP,
-
-               Choked:             true,
-               PeerChoked:         true,
-               PeerMaxRequests:    250,
-               PeerExtensionBytes: peb,
-               PeerID:             peerID,
+               Choked:          true,
+               PeerChoked:      true,
+               PeerMaxRequests: 250,
 
                closing: make(chan struct{}),
                writeCh: make(chan []byte),
                post:    make(chan pp.Message),
-
-               completedHandshake: time.Now(),
        }
-       go c.writer()
-       go c.writeOptimizer(time.Minute)
        return
 }
 
@@ -100,6 +93,8 @@ func (cn *connection) localAddr() net.Addr {
        return cn.conn.LocalAddr()
 }
 
+// Adjust piece position in the request order for this connection based on the
+// given piece priority.
 func (cn *connection) pendPiece(piece int, priority piecePriority) {
        if priority == piecePriorityNone {
                cn.pieceRequestOrder.DeletePiece(piece)
index 9818f131b258ed4f2244eb23c548d725e29a493b..d9b4b0f54bb0f3bf790a380057e1f5892d4265de 100644 (file)
@@ -66,14 +66,18 @@ type torrent struct {
 
        InfoHash InfoHash
        Pieces   []*piece
-       length   int64
+       // Total length of the torrent in bytes. Stored because it's not O(1) to
+       // get this from the info dict.
+       length int64
 
        data StatefulData
 
+       // The info dict. Nil if we don't have it.
        Info *metainfo.Info
-       // Active peer connections.
+       // Active peer connections, running message stream loops.
        Conns []*connection
-       // Set of addrs to which we're attempting to connect.
+       // Set of addrs to which we're attempting to connect. Connections are
+       // half-open until all handshakes are completed.
        HalfOpen map[string]struct{}
 
        // Reserve of peers to connect to. A peer can be both here and in the
@@ -84,11 +88,16 @@ type torrent struct {
 
        // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
        // mirror their respective URLs from the announce-list metainfo key.
-       Trackers     [][]tracker.Client
-       DisplayName  string
-       MetaData     []byte
+       Trackers [][]tracker.Client
+       // Name used if the info name isn't available.
+       DisplayName string
+       // The bencoded bytes of the info dict.
+       MetaData []byte
+       // Each element corresponds to the 16KiB metadata pieces. If true, we have
+       // received that piece.
        metadataHave []bool
 
+       // Closed when .Info is set.
        gotMetainfo chan struct{}
        GotMetainfo <-chan struct{}
 
@@ -195,6 +204,7 @@ func (t *torrent) numConnsUnchoked() (num int) {
        return
 }
 
+// There's a connection to that address already.
 func (t *torrent) addrActive(addr string) bool {
        if _, ok := t.HalfOpen[addr]; ok {
                return true
index cd73b62ab13f0c46600df0ca740c28cb672fb13e..33c40d6b9b1018618dc796e1c2ba091723e92262 100644 (file)
@@ -45,7 +45,7 @@ func TestTorrentRequest(t *testing.T) {
 }
 
 func TestTorrentDoubleClose(t *testing.T) {
-       tt, err := newTorrent(InfoHash{}, nil, 0)
+       tt, err := newTorrent(InfoHash{})
        tt.pruneTimer = time.NewTimer(0)
        if err != nil {
                t.Fatal(err)