]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add support for Fast Extension
authorMatt Joiner <anacrolix@gmail.com>
Thu, 12 Mar 2015 09:06:23 +0000 (20:06 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 12 Mar 2015 09:06:23 +0000 (20:06 +1100)
client.go
client_test.go
connection.go
peer_protocol/protocol.go
torrent.go

index 2d2643ebd9655f41e77456df3b828f28055993c7..177a33a6502480229f95f7f6df94ecd56f0b834b 100644 (file)
--- a/client.go
+++ b/client.go
@@ -21,6 +21,7 @@ import (
        "container/heap"
        "crypto/rand"
        "crypto/sha1"
+       "encoding/hex"
        "errors"
        "expvar"
        "fmt"
@@ -68,6 +69,7 @@ var (
        successfulDials             = expvar.NewInt("successfulDials")
        acceptedConns               = expvar.NewInt("acceptedConns")
        inboundConnsBlocked         = expvar.NewInt("inboundConnsBlocked")
+       peerExtensions              = expvar.NewMap("peerExtensions")
 )
 
 const (
@@ -75,7 +77,8 @@ const (
        //
        // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
        // DHT: http://www.bittorrent.org/beps/bep_0005.html
-       extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
+       // Fast Extension: http://bittorrent.org/beps/bep_0006.html
+       extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
 
        socketsPerTorrent     = 40
        torrentPeersHighWater = 200
@@ -850,7 +853,11 @@ func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res hand
        CopyExact(&res.peerExtensionBytes, b[20:28])
        CopyExact(&res.InfoHash, b[28:48])
        CopyExact(&res.peerID, b[48:68])
+       peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
 
+       // TODO: Maybe we can just drop peers here if we're not interested. This
+       // could prevent them trying to reconnect, falsely believing there was
+       // just a problem.
        if ih == nil { // We were waiting for the peer to tell us what they wanted.
                post(res.InfoHash[:])
                post(peerID[:])
@@ -1008,7 +1015,7 @@ func (t *torrent) initRequestOrdering(c *connection) {
        c.piecePriorities = mathRand.Perm(t.numPieces())
        c.pieceRequestOrder = pieceordering.New()
        for i := 0; i < t.numPieces(); i++ {
-               if !c.PeerHasPiece(pp.Integer(i)) {
+               if !c.PeerHasPiece(i) {
                        continue
                }
                if !t.wantPiece(i) {
@@ -1019,16 +1026,18 @@ func (t *torrent) initRequestOrdering(c *connection) {
 }
 
 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
-       if t.haveInfo() {
-               if c.PeerPieces == nil {
-                       c.PeerPieces = make([]bool, t.numPieces())
-               }
-       } else {
-               for piece >= len(c.PeerPieces) {
-                       c.PeerPieces = append(c.PeerPieces, false)
+       if !c.peerHasAll {
+               if t.haveInfo() {
+                       if c.PeerPieces == nil {
+                               c.PeerPieces = make([]bool, t.numPieces())
+                       }
+               } else {
+                       for piece >= len(c.PeerPieces) {
+                               c.PeerPieces = append(c.PeerPieces, false)
+                       }
                }
+               c.PeerPieces[piece] = true
        }
-       c.PeerPieces[piece] = true
        if t.wantPiece(piece) {
                t.connPendPiece(c, piece)
                me.replenishConnRequests(t, c)
@@ -1166,6 +1175,16 @@ func addrPort(addr net.Addr) int {
        return AddrPort(addr)
 }
 
+func (cl *Client) peerHasAll(t *torrent, cn *connection) {
+       cn.peerHasAll = true
+       cn.PeerPieces = nil
+       if t.haveInfo() {
+               for i := 0; i < t.numPieces(); i++ {
+                       cl.peerGotPiece(t, cn, i)
+               }
+       }
+}
+
 // Processes incoming bittorrent messages. The client lock is held upon entry
 // and exit.
 func (me *Client) connectionLoop(t *torrent, c *connection) error {
@@ -1201,6 +1220,8 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        }
                        // We can then reset our interest.
                        me.replenishConnRequests(t, c)
+               case pp.Reject:
+                       me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
                case pp.Unchoke:
                        c.PeerChoked = false
                        me.peerUnchoked(t, c)
@@ -1248,7 +1269,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                                unexpectedCancels.Add(1)
                        }
                case pp.Bitfield:
-                       if c.PeerPieces != nil {
+                       if c.PeerPieces != nil || c.peerHasAll {
                                err = errors.New("received unexpected bitfield")
                                break
                        }
@@ -1265,6 +1286,24 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                                        me.peerGotPiece(t, c, index)
                                }
                        }
+               case pp.HaveAll:
+                       if c.PeerPieces != nil || c.peerHasAll {
+                               err = errors.New("unexpected have-all")
+                               break
+                       }
+                       me.peerHasAll(t, c)
+               case pp.HaveNone:
+                       if c.peerHasAll || c.PeerPieces != nil {
+                               err = errors.New("unexpected have-none")
+                               break
+                       }
+                       c.PeerPieces = make([]bool, func() int {
+                               if t.haveInfo() {
+                                       return t.numPieces()
+                               } else {
+                                       return 0
+                               }
+                       }())
                case pp.Piece:
                        err = me.downloadedChunk(t, c, &msg)
                case pp.Extended:
@@ -2278,7 +2317,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
        }
        for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
                pieceIndex := e.Piece()
-               if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
+               if !c.PeerHasPiece(pieceIndex) {
                        panic("piece in request order but peer doesn't have it")
                }
                if !t.wantPiece(pieceIndex) {
@@ -2401,7 +2440,7 @@ func (me *Client) pieceChanged(t *torrent, piece int) {
                        }
                        conn.pieceRequestOrder.DeletePiece(int(piece))
                }
-               if t.wantPiece(int(piece)) && conn.PeerHasPiece(pp.Integer(piece)) {
+               if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
                        t.connPendPiece(conn, int(piece))
                        me.replenishConnRequests(t, conn)
                }
index 4ea5b0e3ebd2a4350cd10ee1107c34459776191d..c7445c004ed20cf53b4432cdb4639e2c244322d6 100644 (file)
@@ -287,7 +287,7 @@ func TestReadaheadPieces(t *testing.T) {
                {5 * 1024 * 1024, 1048576, 4},
        } {
                if readaheadPieces(case_.readaheadBytes, case_.pieceLength) != case_.readaheadPieces {
-                       t.Fatalf("case failed: %s", case_)
+                       t.Fatalf("case failed: %v", case_)
                }
        }
 }
index 6cc90c24c61fad0eaba8e083ac6cc5dd61752e31..afb12c4b82823ceb8386750a0714d2e305a62396 100644 (file)
@@ -60,7 +60,9 @@ type connection struct {
        PeerExtensionBytes peerExtensionBytes
        // Whether the peer has the given piece. nil if they've not sent any
        // related messages yet.
-       PeerPieces       []bool
+       PeerPieces []bool
+       peerHasAll bool
+
        PeerMaxRequests  int // Maximum pending requests the peer allows.
        PeerExtensionIDs map[string]int64
        PeerClientName   string
@@ -115,32 +117,42 @@ func (cn *connection) supportsExtension(ext string) bool {
        return ok
 }
 
-func (cn *connection) completedString() string {
-       if cn.PeerPieces == nil {
+func (cn *connection) completedString(t *torrent) string {
+       if cn.PeerPieces == nil && !cn.peerHasAll {
                return "?"
        }
-       // f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
-       // return fmt.Sprintf("%d%%", int(f*100))
-       return fmt.Sprintf("%d/%d", cn.piecesPeerHasCount(), cn.totalPiecesCount())
-}
-
-func (cn *connection) totalPiecesCount() int {
-       return len(cn.PeerPieces)
-}
-
-func (cn *connection) piecesPeerHasCount() (count int) {
-       for _, has := range cn.PeerPieces {
-               if has {
-                       count++
+       return fmt.Sprintf("%d/%d", func() int {
+               if cn.peerHasAll {
+                       if t.haveInfo() {
+                               return t.numPieces()
+                       }
+                       return -1
                }
-       }
-       return
+               ret := 0
+               for _, b := range cn.PeerPieces {
+                       if b {
+                               ret++
+                       }
+               }
+               return ret
+       }(), func() int {
+               if cn.peerHasAll || cn.PeerPieces == nil {
+                       if t.haveInfo() {
+                               return t.numPieces()
+                       }
+                       return -1
+               }
+               return len(cn.PeerPieces)
+       }())
 }
 
 // Correct the PeerPieces slice length. Return false if the existing slice is
 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
 // messages.
 func (cn *connection) setNumPieces(num int) error {
+       if cn.peerHasAll {
+               return nil
+       }
        if cn.PeerPieces == nil {
                return nil
        }
@@ -170,9 +182,9 @@ func eventAgeString(t time.Time) string {
        return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
 }
 
-func (cn *connection) WriteStatus(w io.Writer) {
+func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
        // \t isn't preserved in <pre> blocks?
-       fmt.Fprintf(w, "%s\n    %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
+       fmt.Fprintf(w, "%s\n    %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
        c := func(b byte) {
                fmt.Fprintf(w, "%c", b)
        }
@@ -215,14 +227,14 @@ func (c *connection) Close() {
        go c.Socket.Close()
 }
 
-func (c *connection) PeerHasPiece(index pp.Integer) bool {
-       if c.PeerPieces == nil {
-               return false
+func (c *connection) PeerHasPiece(piece int) bool {
+       if c.peerHasAll {
+               return true
        }
-       if int(index) >= len(c.PeerPieces) {
+       if piece >= len(c.PeerPieces) {
                return false
        }
-       return c.PeerPieces[index]
+       return c.PeerPieces[piece]
 }
 
 func (c *connection) Post(msg pp.Message) {
@@ -242,7 +254,7 @@ func (c *connection) Request(chunk request) bool {
        if len(c.Requests) >= c.PeerMaxRequests {
                return false
        }
-       if !c.PeerHasPiece(chunk.Index) {
+       if !c.PeerHasPiece(int(chunk.Index)) {
                return true
        }
        if c.RequestPending(chunk) {
index 607edf5979ab90cd5e38b89f0ff3796ace0d5f0c..ac958930c1815147e14c9fbc4066567e5e9f287a 100644 (file)
@@ -34,7 +34,15 @@ const (
        Piece                     // 7
        Cancel                    // 8
        Port                      // 9
-       Extended      = 20
+
+       // BEP 6
+       Suggest     = 0xd  // 13
+       HaveAll     = 0xe  // 14
+       HaveNone    = 0xf  // 15
+       Reject      = 0x10 // 16
+       AllowedFast = 0x11 // 17
+
+       Extended = 20
 
        HandshakeExtendedID = 0
 
@@ -62,10 +70,10 @@ func (msg Message) MarshalBinary() (data []byte, err error) {
                        return
                }
                switch msg.Type {
-               case Choke, Unchoke, Interested, NotInterested:
+               case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
                case Have:
                        err = binary.Write(buf, binary.BigEndian, msg.Index)
-               case Request, Cancel:
+               case Request, Cancel, Reject:
                        for _, i := range []Integer{msg.Index, msg.Begin, msg.Length} {
                                err = binary.Write(buf, binary.BigEndian, i)
                                if err != nil {
@@ -159,11 +167,11 @@ func (d *Decoder) Decode(msg *Message) (err error) {
        }
        msg.Type = MessageType(c)
        switch msg.Type {
-       case Choke, Unchoke, Interested, NotInterested:
+       case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
                return
        case Have:
                err = msg.Index.Read(r)
-       case Request, Cancel:
+       case Request, Cancel, Reject:
                for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
                        err = data.Read(r)
                        if err != nil {
index 4217efa16a896a41b1ccee918fa582bccb7bb7d1..1f5660abcad06dcb62578498670c81ee69d5d406 100644 (file)
@@ -482,7 +482,7 @@ func (t *torrent) WriteStatus(w io.Writer) {
                t: t,
        })
        for _, c := range t.Conns {
-               c.WriteStatus(w)
+               c.WriteStatus(w, t)
        }
 }
 
@@ -553,7 +553,7 @@ func (t *torrent) lastPieceSize() int {
 }
 
 func (t *torrent) numPieces() int {
-       return len(t.Info.Pieces) / 20
+       return t.Info.NumPieces()
 }
 
 func (t *torrent) numPiecesCompleted() (num int) {
@@ -744,7 +744,7 @@ func (t *torrent) wantPiece(index int) bool {
 
 func (t *torrent) connHasWantedPieces(c *connection) bool {
        for p := range t.Pieces {
-               if t.wantPiece(p) && c.PeerHasPiece(pp.Integer(p)) {
+               if t.wantPiece(p) && c.PeerHasPiece(p) {
                        return true
                }
        }