]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Tidy up extension bytes handling; don't close conn from handshake writer; force proto...
authorMatt Joiner <anacrolix@gmail.com>
Thu, 12 Mar 2015 19:21:13 +0000 (06:21 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 12 Mar 2015 19:21:13 +0000 (06:21 +1100)
client.go
connection.go
torrent.go

index 177a33a6502480229f95f7f6df94ecd56f0b834b..396205ae262759f5615d471ac3730ccb2bfd3691 100644 (file)
--- a/client.go
+++ b/client.go
@@ -38,6 +38,8 @@ import (
        "syscall"
        "time"
 
+       "bitbucket.org/anacrolix/go.torrent/mse"
+
        "bitbucket.org/anacrolix/go.torrent/data"
        filePkg "bitbucket.org/anacrolix/go.torrent/data/file"
        "bitbucket.org/anacrolix/go.torrent/dht"
@@ -77,8 +79,8 @@ const (
        //
        // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
        // DHT: http://www.bittorrent.org/beps/bep_0005.html
-       // Fast Extension: http://bittorrent.org/beps/bep_0006.html
-       extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
+       // Fast Extension: http://bittorrent.org/beps/bep_0006.html ([7]|=4)
+       defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
 
        socketsPerTorrent     = 40
        torrentPeersHighWater = 200
@@ -87,7 +89,7 @@ const (
        // Limit how long handshake can take. This is to reduce the lingering
        // impact of a few bad apples. 4s loses 1% of successful handshakes that
        // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
-       handshakeTimeout = 4 * time.Second
+       handshakeTimeout = 45 * time.Second
 
        pruneInterval = 10 * time.Second
 )
@@ -128,6 +130,7 @@ type Client struct {
        _configDir      string
        config          Config
        pruneTimer      *time.Timer
+       extensionBytes  peerExtensionBytes
 
        torrentDataOpener TorrentDataOpener
 
@@ -469,6 +472,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                quit:     make(chan struct{}),
                torrents: make(map[InfoHash]*torrent),
        }
+       CopyExact(&cl.extensionBytes, defaultExtensionBytes)
        cl.event.L = &cl.mu
        if cfg.TorrentDataOpener != nil {
                cl.torrentDataOpener = cfg.TorrentDataOpener
@@ -777,12 +781,11 @@ func addrCompactIP(addr net.Addr) (string, error) {
        return string(ip.To16()), nil
 }
 
-func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
+func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
        var err error
        for b := range bb {
                _, err = w.Write(b)
                if err != nil {
-                       w.Close()
                        break
                }
        }
@@ -794,6 +797,18 @@ type (
        peerID             [20]byte
 )
 
+func (me *peerExtensionBytes) SupportsExtended() bool {
+       return me[5]&0x10 != 0
+}
+
+func (me *peerExtensionBytes) SupportsDHT() bool {
+       return me[7]&0x01 != 0
+}
+
+func (me *peerExtensionBytes) SupportsFast() bool {
+       return me[7]&0x04 != 0
+}
+
 type handshakeResult struct {
        peerExtensionBytes
        peerID
@@ -804,7 +819,7 @@ type handshakeResult struct {
 // peer initiated the connection. Returns ok if the handshake was successful,
 // and err if there was an unexpected condition other than the peer simply
 // abandoning the handshake.
-func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
+func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
        // Bytes to be sent to the peer. Should never block the sender.
        postCh := make(chan []byte, 4)
        // A single error value sent when the writer completes.
@@ -836,7 +851,7 @@ func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res hand
        }
 
        post([]byte(pp.Protocol))
-       post([]byte(extensionBytes))
+       post(extensions[:])
        if ih != nil { // We already know what we want.
                post(ih[:])
                post(peerID[:])
@@ -907,13 +922,28 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
        me.mu.Lock()
        me.handshaking++
        me.mu.Unlock()
-       hsRes, ok, err := handshake(sock, func() *InfoHash {
+       var rw io.ReadWriter = sock
+       if torrent == nil {
+               rw, err = mse.ReceiveHandshake(sock, func() (ret [][]byte) {
+                       for ih := range me.torrents {
+                               ret = append(ret, ih[:])
+                       }
+                       return
+               }())
+       } else {
+               rw, err = mse.InitiateHandshake(sock, torrent.InfoHash[:])
+       }
+       if err != nil {
+               err = fmt.Errorf("error during MSE handshake: %s", err)
+               return
+       }
+       hsRes, ok, err := handshake(rw, func() *InfoHash {
                if torrent == nil {
                        return nil
                } else {
                        return &torrent.InfoHash
                }
-       }(), me.peerID)
+       }(), me.peerID, me.extensionBytes)
        me.mu.Lock()
        defer me.mu.Unlock()
        if me.handshaking == 0 {
@@ -936,13 +966,13 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
        }
        sock.SetWriteDeadline(time.Time{})
        sock = peerConn{sock}
-       conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
+       conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP, rw)
        defer conn.Close()
        conn.Discovery = discovery
        if !me.addConnection(torrent, conn) {
                return
        }
-       if conn.PeerExtensionBytes[5]&0x10 != 0 {
+       if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
                conn.Post(pp.Message{
                        Type:       pp.Extended,
                        ExtendedID: pp.HandshakeExtendedID,
@@ -969,7 +999,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
                                if p := me.incomingPeerPort(); p != 0 {
                                        d["p"] = p
                                }
-                               yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
+                               yourip, err := addrCompactIP(conn.remoteAddr())
                                if err != nil {
                                        log.Printf("error calculating yourip field value in extension handshake: %s", err)
                                } else {
@@ -989,8 +1019,12 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
                        Type:     pp.Bitfield,
                        Bitfield: torrent.bitfield(),
                })
+       } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
+               conn.Post(pp.Message{
+                       Type: pp.HaveNone,
+               })
        }
-       if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
+       if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
                conn.Post(pp.Message{
                        Type: pp.Port,
                        Port: uint16(AddrPort(me.dHT.LocalAddr())),
@@ -998,7 +1032,6 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
        }
        if torrent.haveInfo() {
                torrent.initRequestOrdering(conn)
-               me.replenishConnRequests(torrent, conn)
        }
        err = me.connectionLoop(torrent, conn)
        if err != nil {
@@ -1189,7 +1222,7 @@ func (cl *Client) peerHasAll(t *torrent, cn *connection) {
 // and exit.
 func (me *Client) connectionLoop(t *torrent, c *connection) error {
        decoder := pp.Decoder{
-               R:         bufio.NewReader(c.Socket),
+               R:         bufio.NewReader(c.rw),
                MaxLength: 256 * 1024,
        }
        for {
@@ -1222,6 +1255,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        me.replenishConnRequests(t, c)
                case pp.Reject:
                        me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
+                       me.replenishConnRequests(t, c)
                case pp.Unchoke:
                        c.PeerChoked = false
                        me.peerUnchoked(t, c)
@@ -1409,7 +1443,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        if me.dHT == nil {
                                break
                        }
-                       pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
+                       pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
                        if err != nil {
                                panic(err)
                        }
index afb12c4b82823ceb8386750a0714d2e305a62396..75a1dba351dbc7e46f59abe8a913ad0fb458359e 100644 (file)
@@ -28,7 +28,8 @@ const (
 
 // Maintains the state of a connection with a peer.
 type connection struct {
-       Socket    net.Conn
+       conn      net.Conn
+       rw        io.ReadWriter // The real slim shady
        Discovery peerSource
        uTP       bool
        closing   chan struct{}
@@ -68,10 +69,11 @@ type connection struct {
        PeerClientName   string
 }
 
-func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
+func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool, rw io.ReadWriter) (c *connection) {
        c = &connection{
-               Socket: sock,
-               uTP:    uTP,
+               conn: sock,
+               rw:   rw,
+               uTP:  uTP,
 
                Choked:             true,
                PeerChoked:         true,
@@ -90,6 +92,14 @@ func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP b
        return
 }
 
+func (cn *connection) remoteAddr() net.Addr {
+       return cn.conn.RemoteAddr()
+}
+
+func (cn *connection) localAddr() net.Addr {
+       return cn.conn.LocalAddr()
+}
+
 func (cn *connection) pendPiece(piece int, priority piecePriority) {
        if priority == piecePriorityNone {
                cn.pieceRequestOrder.DeletePiece(piece)
@@ -184,7 +194,7 @@ func eventAgeString(t time.Time) string {
 
 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
        // \t isn't preserved in <pre> blocks?
-       fmt.Fprintf(w, "%s\n    %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
+       fmt.Fprintf(w, "%s\n    %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.localAddr(), cn.remoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
        c := func(b byte) {
                fmt.Fprintf(w, "%c", b)
        }
@@ -224,7 +234,7 @@ func (c *connection) Close() {
        }
        close(c.closing)
        // TODO: This call blocks sometimes, why?
-       go c.Socket.Close()
+       go c.conn.Close()
 }
 
 func (c *connection) PeerHasPiece(piece int) bool {
@@ -347,7 +357,7 @@ func (c *connection) SetInterested(interested bool) {
 // Writes buffers to the socket from the write channel.
 func (conn *connection) writer() {
        // Reduce write syscalls.
-       buf := bufio.NewWriterSize(conn.Socket, 0x8000) // 32 KiB
+       buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
        // Receives when buf is not empty.
        notEmpty := make(chan struct{}, 1)
        for {
index 1f5660abcad06dcb62578498670c81ee69d5d406..9818f131b258ed4f2244eb23c548d725e29a493b 100644 (file)
@@ -200,7 +200,7 @@ func (t *torrent) addrActive(addr string) bool {
                return true
        }
        for _, c := range t.Conns {
-               if c.Socket.RemoteAddr().String() == addr {
+               if c.remoteAddr().String() == addr {
                        return true
                }
        }