From: Matt Joiner Date: Thu, 12 Mar 2015 09:06:23 +0000 (+1100) Subject: Add support for Fast Extension X-Git-Tag: v1.0.0~1284 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=e85b7e228b1c3373a97cbbdba359fe4ebfb8536d;p=btrtrc.git Add support for Fast Extension --- diff --git a/client.go b/client.go index 2d2643eb..177a33a6 100644 --- a/client.go +++ b/client.go @@ -21,6 +21,7 @@ import ( "container/heap" "crypto/rand" "crypto/sha1" + "encoding/hex" "errors" "expvar" "fmt" @@ -68,6 +69,7 @@ var ( successfulDials = expvar.NewInt("successfulDials") acceptedConns = expvar.NewInt("acceptedConns") inboundConnsBlocked = expvar.NewInt("inboundConnsBlocked") + peerExtensions = expvar.NewMap("peerExtensions") ) const ( @@ -75,7 +77,8 @@ const ( // // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html // DHT: http://www.bittorrent.org/beps/bep_0005.html - extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01" + // Fast Extension: http://bittorrent.org/beps/bep_0006.html + extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05" socketsPerTorrent = 40 torrentPeersHighWater = 200 @@ -850,7 +853,11 @@ func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res hand CopyExact(&res.peerExtensionBytes, b[20:28]) CopyExact(&res.InfoHash, b[28:48]) CopyExact(&res.peerID, b[48:68]) + peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1) + // TODO: Maybe we can just drop peers here if we're not interested. This + // could prevent them trying to reconnect, falsely believing there was + // just a problem. if ih == nil { // We were waiting for the peer to tell us what they wanted. post(res.InfoHash[:]) post(peerID[:]) @@ -1008,7 +1015,7 @@ func (t *torrent) initRequestOrdering(c *connection) { c.piecePriorities = mathRand.Perm(t.numPieces()) c.pieceRequestOrder = pieceordering.New() for i := 0; i < t.numPieces(); i++ { - if !c.PeerHasPiece(pp.Integer(i)) { + if !c.PeerHasPiece(i) { continue } if !t.wantPiece(i) { @@ -1019,16 +1026,18 @@ func (t *torrent) initRequestOrdering(c *connection) { } func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) { - if t.haveInfo() { - if c.PeerPieces == nil { - c.PeerPieces = make([]bool, t.numPieces()) - } - } else { - for piece >= len(c.PeerPieces) { - c.PeerPieces = append(c.PeerPieces, false) + if !c.peerHasAll { + if t.haveInfo() { + if c.PeerPieces == nil { + c.PeerPieces = make([]bool, t.numPieces()) + } + } else { + for piece >= len(c.PeerPieces) { + c.PeerPieces = append(c.PeerPieces, false) + } } + c.PeerPieces[piece] = true } - c.PeerPieces[piece] = true if t.wantPiece(piece) { t.connPendPiece(c, piece) me.replenishConnRequests(t, c) @@ -1166,6 +1175,16 @@ func addrPort(addr net.Addr) int { return AddrPort(addr) } +func (cl *Client) peerHasAll(t *torrent, cn *connection) { + cn.peerHasAll = true + cn.PeerPieces = nil + if t.haveInfo() { + for i := 0; i < t.numPieces(); i++ { + cl.peerGotPiece(t, cn, i) + } + } +} + // Processes incoming bittorrent messages. The client lock is held upon entry // and exit. func (me *Client) connectionLoop(t *torrent, c *connection) error { @@ -1201,6 +1220,8 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { } // We can then reset our interest. me.replenishConnRequests(t, c) + case pp.Reject: + me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length)) case pp.Unchoke: c.PeerChoked = false me.peerUnchoked(t, c) @@ -1248,7 +1269,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { unexpectedCancels.Add(1) } case pp.Bitfield: - if c.PeerPieces != nil { + if c.PeerPieces != nil || c.peerHasAll { err = errors.New("received unexpected bitfield") break } @@ -1265,6 +1286,24 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { me.peerGotPiece(t, c, index) } } + case pp.HaveAll: + if c.PeerPieces != nil || c.peerHasAll { + err = errors.New("unexpected have-all") + break + } + me.peerHasAll(t, c) + case pp.HaveNone: + if c.peerHasAll || c.PeerPieces != nil { + err = errors.New("unexpected have-none") + break + } + c.PeerPieces = make([]bool, func() int { + if t.haveInfo() { + return t.numPieces() + } else { + return 0 + } + }()) case pp.Piece: err = me.downloadedChunk(t, c, &msg) case pp.Extended: @@ -2278,7 +2317,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) { } for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() { pieceIndex := e.Piece() - if !c.PeerHasPiece(pp.Integer(pieceIndex)) { + if !c.PeerHasPiece(pieceIndex) { panic("piece in request order but peer doesn't have it") } if !t.wantPiece(pieceIndex) { @@ -2401,7 +2440,7 @@ func (me *Client) pieceChanged(t *torrent, piece int) { } conn.pieceRequestOrder.DeletePiece(int(piece)) } - if t.wantPiece(int(piece)) && conn.PeerHasPiece(pp.Integer(piece)) { + if t.wantPiece(piece) && conn.PeerHasPiece(piece) { t.connPendPiece(conn, int(piece)) me.replenishConnRequests(t, conn) } diff --git a/client_test.go b/client_test.go index 4ea5b0e3..c7445c00 100644 --- a/client_test.go +++ b/client_test.go @@ -287,7 +287,7 @@ func TestReadaheadPieces(t *testing.T) { {5 * 1024 * 1024, 1048576, 4}, } { if readaheadPieces(case_.readaheadBytes, case_.pieceLength) != case_.readaheadPieces { - t.Fatalf("case failed: %s", case_) + t.Fatalf("case failed: %v", case_) } } } diff --git a/connection.go b/connection.go index 6cc90c24..afb12c4b 100644 --- a/connection.go +++ b/connection.go @@ -60,7 +60,9 @@ type connection struct { PeerExtensionBytes peerExtensionBytes // Whether the peer has the given piece. nil if they've not sent any // related messages yet. - PeerPieces []bool + PeerPieces []bool + peerHasAll bool + PeerMaxRequests int // Maximum pending requests the peer allows. PeerExtensionIDs map[string]int64 PeerClientName string @@ -115,32 +117,42 @@ func (cn *connection) supportsExtension(ext string) bool { return ok } -func (cn *connection) completedString() string { - if cn.PeerPieces == nil { +func (cn *connection) completedString(t *torrent) string { + if cn.PeerPieces == nil && !cn.peerHasAll { return "?" } - // f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount()) - // return fmt.Sprintf("%d%%", int(f*100)) - return fmt.Sprintf("%d/%d", cn.piecesPeerHasCount(), cn.totalPiecesCount()) -} - -func (cn *connection) totalPiecesCount() int { - return len(cn.PeerPieces) -} - -func (cn *connection) piecesPeerHasCount() (count int) { - for _, has := range cn.PeerPieces { - if has { - count++ + return fmt.Sprintf("%d/%d", func() int { + if cn.peerHasAll { + if t.haveInfo() { + return t.numPieces() + } + return -1 } - } - return + ret := 0 + for _, b := range cn.PeerPieces { + if b { + ret++ + } + } + return ret + }(), func() int { + if cn.peerHasAll || cn.PeerPieces == nil { + if t.haveInfo() { + return t.numPieces() + } + return -1 + } + return len(cn.PeerPieces) + }()) } // Correct the PeerPieces slice length. Return false if the existing slice is // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE // messages. func (cn *connection) setNumPieces(num int) error { + if cn.peerHasAll { + return nil + } if cn.PeerPieces == nil { return nil } @@ -170,9 +182,9 @@ func eventAgeString(t time.Time) string { return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds()) } -func (cn *connection) WriteStatus(w io.Writer) { +func (cn *connection) WriteStatus(w io.Writer, t *torrent) { // \t isn't preserved in
 blocks?
-	fmt.Fprintf(w, "%s\n    %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
+	fmt.Fprintf(w, "%s\n    %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
 	c := func(b byte) {
 		fmt.Fprintf(w, "%c", b)
 	}
@@ -215,14 +227,14 @@ func (c *connection) Close() {
 	go c.Socket.Close()
 }
 
-func (c *connection) PeerHasPiece(index pp.Integer) bool {
-	if c.PeerPieces == nil {
-		return false
+func (c *connection) PeerHasPiece(piece int) bool {
+	if c.peerHasAll {
+		return true
 	}
-	if int(index) >= len(c.PeerPieces) {
+	if piece >= len(c.PeerPieces) {
 		return false
 	}
-	return c.PeerPieces[index]
+	return c.PeerPieces[piece]
 }
 
 func (c *connection) Post(msg pp.Message) {
@@ -242,7 +254,7 @@ func (c *connection) Request(chunk request) bool {
 	if len(c.Requests) >= c.PeerMaxRequests {
 		return false
 	}
-	if !c.PeerHasPiece(chunk.Index) {
+	if !c.PeerHasPiece(int(chunk.Index)) {
 		return true
 	}
 	if c.RequestPending(chunk) {
diff --git a/peer_protocol/protocol.go b/peer_protocol/protocol.go
index 607edf59..ac958930 100644
--- a/peer_protocol/protocol.go
+++ b/peer_protocol/protocol.go
@@ -34,7 +34,15 @@ const (
 	Piece                     // 7
 	Cancel                    // 8
 	Port                      // 9
-	Extended      = 20
+
+	// BEP 6
+	Suggest     = 0xd  // 13
+	HaveAll     = 0xe  // 14
+	HaveNone    = 0xf  // 15
+	Reject      = 0x10 // 16
+	AllowedFast = 0x11 // 17
+
+	Extended = 20
 
 	HandshakeExtendedID = 0
 
@@ -62,10 +70,10 @@ func (msg Message) MarshalBinary() (data []byte, err error) {
 			return
 		}
 		switch msg.Type {
-		case Choke, Unchoke, Interested, NotInterested:
+		case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
 		case Have:
 			err = binary.Write(buf, binary.BigEndian, msg.Index)
-		case Request, Cancel:
+		case Request, Cancel, Reject:
 			for _, i := range []Integer{msg.Index, msg.Begin, msg.Length} {
 				err = binary.Write(buf, binary.BigEndian, i)
 				if err != nil {
@@ -159,11 +167,11 @@ func (d *Decoder) Decode(msg *Message) (err error) {
 	}
 	msg.Type = MessageType(c)
 	switch msg.Type {
-	case Choke, Unchoke, Interested, NotInterested:
+	case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
 		return
 	case Have:
 		err = msg.Index.Read(r)
-	case Request, Cancel:
+	case Request, Cancel, Reject:
 		for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
 			err = data.Read(r)
 			if err != nil {
diff --git a/torrent.go b/torrent.go
index 4217efa1..1f5660ab 100644
--- a/torrent.go
+++ b/torrent.go
@@ -482,7 +482,7 @@ func (t *torrent) WriteStatus(w io.Writer) {
 		t: t,
 	})
 	for _, c := range t.Conns {
-		c.WriteStatus(w)
+		c.WriteStatus(w, t)
 	}
 }
 
@@ -553,7 +553,7 @@ func (t *torrent) lastPieceSize() int {
 }
 
 func (t *torrent) numPieces() int {
-	return len(t.Info.Pieces) / 20
+	return t.Info.NumPieces()
 }
 
 func (t *torrent) numPiecesCompleted() (num int) {
@@ -744,7 +744,7 @@ func (t *torrent) wantPiece(index int) bool {
 
 func (t *torrent) connHasWantedPieces(c *connection) bool {
 	for p := range t.Pieces {
-		if t.wantPiece(p) && c.PeerHasPiece(pp.Integer(p)) {
+		if t.wantPiece(p) && c.PeerHasPiece(p) {
 			return true
 		}
 	}