From: Matt Joiner <anacrolix@gmail.com>
Date: Sun, 4 Feb 2018 13:18:38 +0000 (+1100)
Subject: Support AllowedFast and enable fast extension
X-Git-Tag: v1.0.0~202
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=20316e5af4662d43071e22bc9821888bf37a71a8;p=btrtrc.git

Support AllowedFast and enable fast extension
---

diff --git a/connection.go b/connection.go
index a49f4fa2..16888aa0 100644
--- a/connection.go
+++ b/connection.go
@@ -85,6 +85,7 @@ type connection struct {
 	peerMinPieces int
 	// Pieces we've accepted chunks for from the peer.
 	peerTouchedPieces map[int]struct{}
+	peerAllowedFast   bitmap.Bitmap
 
 	PeerMaxRequests  int // Maximum pending requests the peer allows.
 	PeerExtensionIDs map[string]byte
@@ -303,27 +304,35 @@ func (cn *connection) nominalMaxRequests() (ret int) {
 	return
 }
 
-// Returns true if an unsatisfied request was canceled.
-func (cn *connection) PeerCancel(r request) bool {
-	if cn.PeerRequests == nil {
-		return false
-	}
+func (cn *connection) onPeerSentCancel(r request) {
 	if _, ok := cn.PeerRequests[r]; !ok {
-		return false
+		torrent.Add("unexpected cancels received", 1)
+		return
+	}
+	if cn.fastEnabled() {
+		cn.reject(r)
+	} else {
+		delete(cn.PeerRequests, r)
 	}
-	delete(cn.PeerRequests, r)
-	return true
 }
 
-func (cn *connection) Choke(msg func(pp.Message) bool) bool {
+func (cn *connection) Choke(msg messageWriter) (more bool) {
 	if cn.Choked {
 		return true
 	}
-	cn.PeerRequests = nil
 	cn.Choked = true
-	return msg(pp.Message{
+	more = msg(pp.Message{
 		Type: pp.Choke,
 	})
+	if cn.fastEnabled() {
+		for r := range cn.PeerRequests {
+			// TODO: Don't reject pieces in allowed fast set.
+			cn.reject(r)
+		}
+	} else {
+		cn.PeerRequests = nil
+	}
+	return
 }
 
 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
@@ -372,6 +381,13 @@ func (cn *connection) request(r request, mw messageWriter) bool {
 	if _, ok := cn.t.conns[cn]; !ok {
 		panic("requesting but not in active conns")
 	}
+	if cn.PeerChoked {
+		if cn.peerAllowedFast.Get(int(r.Index)) {
+			torrent.Add("allowed fast requests sent", 1)
+		} else {
+			panic("requesting while choked and not allowed fast")
+		}
+	}
 	cn.t.pendingRequests[r]++
 	return mw(pp.Message{
 		Type:   pp.Request,
@@ -505,6 +521,7 @@ func nextRequestState(
 	iterPendingRequests func(f func(request) bool),
 	requestsLowWater int,
 	requestsHighWater int,
+	allowedFast bitmap.Bitmap,
 ) (
 	cancelExisting bool, // Cancel all our pending requests
 	newRequests []request, // Chunks to request that we currently aren't
@@ -519,7 +536,12 @@ func nextRequestState(
 	iterPendingRequests(func(r request) bool {
 		interested = true
 		if peerChoking {
-			return false
+			if allowedFast.IsEmpty() {
+				return false
+			}
+			if !allowedFast.Get(int(r.Index)) {
+				return true
+			}
 		}
 		if _, ok := currentRequests[r]; !ok {
 			if newRequests == nil {
@@ -632,6 +654,7 @@ func (cn *connection) desiredRequestState() (bool, []request, bool) {
 		cn.iterPendingRequests,
 		cn.requestsLowWater,
 		cn.nominalMaxRequests(),
+		cn.peerAllowedFast,
 	)
 }
 
@@ -840,30 +863,38 @@ func (c *connection) fastEnabled() bool {
 	return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
 }
 
-// Returns true if we were able to reject the request.
-func (c *connection) reject(r request) bool {
+func (c *connection) reject(r request) {
 	if !c.fastEnabled() {
-		return false
+		panic("fast not enabled")
 	}
-	c.deleteRequest(r)
 	c.Post(r.ToMsg(pp.Reject))
-	return true
+	delete(c.PeerRequests, r)
 }
 
 func (c *connection) onReadRequest(r request) error {
 	requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
 	if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
+		torrent.Add("bad requests received", 1)
 		return errors.New("bad request")
 	}
 	if _, ok := c.PeerRequests[r]; ok {
+		torrent.Add("duplicate requests received", 1)
 		return nil
 	}
 	if c.Choked {
-		c.reject(r)
+		torrent.Add("requests received while choking", 1)
+		if c.fastEnabled() {
+			torrent.Add("requests rejected while choking", 1)
+			c.reject(r)
+		}
 		return nil
 	}
 	if len(c.PeerRequests) >= maxRequests {
-		c.reject(r)
+		torrent.Add("requests received while queue full", 1)
+		if c.fastEnabled() {
+			c.reject(r)
+		}
+		// BEP 6 says we may close here if we choose.
 		return nil
 	}
 	if !c.t.havePiece(r.Index.Int()) {
@@ -915,6 +946,9 @@ func (c *connection) mainReadLoop() error {
 			continue
 		}
 		messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+		if msg.Type.FastExtension() && !c.fastEnabled() {
+			return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
+		}
 		switch msg.Type {
 		case pp.Choke:
 			c.PeerChoked = true
@@ -922,9 +956,7 @@ func (c *connection) mainReadLoop() error {
 			// We can then reset our interest.
 			c.updateRequests()
 		case pp.Reject:
-			if c.deleteRequest(newRequestFromMessage(&msg)) {
-				c.updateRequests()
-			}
+			c.deleteRequest(newRequestFromMessage(&msg))
 		case pp.Unchoke:
 			c.PeerChoked = false
 			c.tickleWriter()
@@ -933,6 +965,7 @@ func (c *connection) mainReadLoop() error {
 			c.tickleWriter()
 		case pp.NotInterested:
 			c.PeerInterested = false
+			// TODO: Reject?
 			c.PeerRequests = nil
 		case pp.Have:
 			err = c.peerSentHave(int(msg.Index))
@@ -941,9 +974,7 @@ func (c *connection) mainReadLoop() error {
 			err = c.onReadRequest(r)
 		case pp.Cancel:
 			req := newRequestFromMessage(&msg)
-			if !c.PeerCancel(req) {
-				unexpectedCancels.Add(1)
-			}
+			c.onPeerSentCancel(req)
 		case pp.Bitfield:
 			err = c.peerSentBitfield(msg.Bitfield)
 		case pp.HaveAll:
@@ -969,6 +1000,15 @@ func (c *connection) mainReadLoop() error {
 				pingAddr.Port = int(msg.Port)
 			}
 			go cl.dHT.Ping(pingAddr, nil)
+		case pp.AllowedFast:
+			torrent.Add("allowed fasts received", 1)
+			log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
+			c.peerAllowedFast.Add(int(msg.Index))
+			c.updateRequests()
+		case pp.Suggest:
+			torrent.Add("suggests received", 1)
+			log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
+			c.updateRequests()
 		default:
 			err = fmt.Errorf("received unknown message type: %#v", msg.Type)
 		}
@@ -1117,6 +1157,13 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 		unexpectedChunksReceived.Add(1)
 	}
 
+	if c.PeerChoked {
+		torrent.Add("chunks received while choked", 1)
+		if c.peerAllowedFast.Get(int(req.Index)) {
+			torrent.Add("chunks received due to allowed fast", 1)
+		}
+	}
+
 	// Do we actually want this chunk?
 	if !t.wantPiece(req) {
 		unwantedChunksReceived.Add(1)
diff --git a/global.go b/global.go
index 8012fe61..9855f8ac 100644
--- a/global.go
+++ b/global.go
@@ -17,7 +17,7 @@ const (
 )
 
 func defaultPeerExtensionBytes() peerExtensionBytes {
-	return newPeerExtensionBytes(ExtensionBitDHT, ExtensionBitExtended)
+	return newPeerExtensionBytes(ExtensionBitDHT, ExtensionBitExtended, ExtensionBitFast)
 }
 
 // I could move a lot of these counters to their own file, but I suspect they
@@ -27,10 +27,11 @@ var (
 	unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected")
 	chunksReceived           = expvar.NewInt("chunksReceived")
 
+	torrent = expvar.NewMap("torrent")
+
 	peersAddedBySource = expvar.NewMap("peersAddedBySource")
 
 	uploadChunksPosted = expvar.NewInt("uploadChunksPosted")
-	unexpectedCancels  = expvar.NewInt("unexpectedCancels")
 
 	pieceHashedCorrect    = expvar.NewInt("pieceHashedCorrect")
 	pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
diff --git a/handshake_test.go b/handshake_test.go
index cc761399..8c2c6d24 100644
--- a/handshake_test.go
+++ b/handshake_test.go
@@ -3,14 +3,13 @@ package torrent
 import (
 	"testing"
 
-	"github.com/anacrolix/missinggo"
 	"github.com/stretchr/testify/assert"
 )
 
 func TestDefaultExtensionBytes(t *testing.T) {
-	var pex peerExtensionBytes
-	missinggo.CopyExact(&pex, defaultPeerExtensionBytes())
+	pex := defaultPeerExtensionBytes()
 	assert.True(t, pex.SupportsDHT())
 	assert.True(t, pex.SupportsExtended())
-	assert.False(t, pex.SupportsFast())
+	assert.False(t, pex.GetBit(63))
+	assert.Panics(t, func() { pex.GetBit(64) })
 }
diff --git a/peer_protocol/decoder.go b/peer_protocol/decoder.go
index 3f66cdc9..37758fd8 100644
--- a/peer_protocol/decoder.go
+++ b/peer_protocol/decoder.go
@@ -54,7 +54,7 @@ func (d *Decoder) Decode(msg *Message) (err error) {
 	switch msg.Type {
 	case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
 		return
-	case Have:
+	case Have, AllowedFast, Suggest:
 		err = msg.Index.Read(r)
 	case Request, Cancel, Reject:
 		for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
diff --git a/peer_protocol/protocol.go b/peer_protocol/protocol.go
index 13d76def..aa53ffdf 100644
--- a/peer_protocol/protocol.go
+++ b/peer_protocol/protocol.go
@@ -16,6 +16,10 @@ func (me MessageType) String() string {
 	return strconv.FormatInt(int64(me), 10)
 }
 
+func (mt MessageType) FastExtension() bool {
+	return mt >= Suggest && mt <= AllowedFast
+}
+
 const (
 	Choke         MessageType = iota
 	Unchoke                   // 1