]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Support AllowedFast and enable fast extension
authorMatt Joiner <anacrolix@gmail.com>
Sun, 4 Feb 2018 13:18:38 +0000 (00:18 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 4 Feb 2018 13:18:38 +0000 (00:18 +1100)
connection.go
global.go
handshake_test.go
peer_protocol/decoder.go
peer_protocol/protocol.go

index a49f4fa27b0f43d16de638d2ddc377696b82869b..16888aa0ca104f96721bca889ffb2b319aededf8 100644 (file)
@@ -85,6 +85,7 @@ type connection struct {
        peerMinPieces int
        // Pieces we've accepted chunks for from the peer.
        peerTouchedPieces map[int]struct{}
+       peerAllowedFast   bitmap.Bitmap
 
        PeerMaxRequests  int // Maximum pending requests the peer allows.
        PeerExtensionIDs map[string]byte
@@ -303,27 +304,35 @@ func (cn *connection) nominalMaxRequests() (ret int) {
        return
 }
 
-// Returns true if an unsatisfied request was canceled.
-func (cn *connection) PeerCancel(r request) bool {
-       if cn.PeerRequests == nil {
-               return false
-       }
+func (cn *connection) onPeerSentCancel(r request) {
        if _, ok := cn.PeerRequests[r]; !ok {
-               return false
+               torrent.Add("unexpected cancels received", 1)
+               return
+       }
+       if cn.fastEnabled() {
+               cn.reject(r)
+       } else {
+               delete(cn.PeerRequests, r)
        }
-       delete(cn.PeerRequests, r)
-       return true
 }
 
-func (cn *connection) Choke(msg func(pp.Message) bool) bool {
+func (cn *connection) Choke(msg messageWriter) (more bool) {
        if cn.Choked {
                return true
        }
-       cn.PeerRequests = nil
        cn.Choked = true
-       return msg(pp.Message{
+       more = msg(pp.Message{
                Type: pp.Choke,
        })
+       if cn.fastEnabled() {
+               for r := range cn.PeerRequests {
+                       // TODO: Don't reject pieces in allowed fast set.
+                       cn.reject(r)
+               }
+       } else {
+               cn.PeerRequests = nil
+       }
+       return
 }
 
 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
@@ -372,6 +381,13 @@ func (cn *connection) request(r request, mw messageWriter) bool {
        if _, ok := cn.t.conns[cn]; !ok {
                panic("requesting but not in active conns")
        }
+       if cn.PeerChoked {
+               if cn.peerAllowedFast.Get(int(r.Index)) {
+                       torrent.Add("allowed fast requests sent", 1)
+               } else {
+                       panic("requesting while choked and not allowed fast")
+               }
+       }
        cn.t.pendingRequests[r]++
        return mw(pp.Message{
                Type:   pp.Request,
@@ -505,6 +521,7 @@ func nextRequestState(
        iterPendingRequests func(f func(request) bool),
        requestsLowWater int,
        requestsHighWater int,
+       allowedFast bitmap.Bitmap,
 ) (
        cancelExisting bool, // Cancel all our pending requests
        newRequests []request, // Chunks to request that we currently aren't
@@ -519,7 +536,12 @@ func nextRequestState(
        iterPendingRequests(func(r request) bool {
                interested = true
                if peerChoking {
-                       return false
+                       if allowedFast.IsEmpty() {
+                               return false
+                       }
+                       if !allowedFast.Get(int(r.Index)) {
+                               return true
+                       }
                }
                if _, ok := currentRequests[r]; !ok {
                        if newRequests == nil {
@@ -632,6 +654,7 @@ func (cn *connection) desiredRequestState() (bool, []request, bool) {
                cn.iterPendingRequests,
                cn.requestsLowWater,
                cn.nominalMaxRequests(),
+               cn.peerAllowedFast,
        )
 }
 
@@ -840,30 +863,38 @@ func (c *connection) fastEnabled() bool {
        return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
 }
 
-// Returns true if we were able to reject the request.
-func (c *connection) reject(r request) bool {
+func (c *connection) reject(r request) {
        if !c.fastEnabled() {
-               return false
+               panic("fast not enabled")
        }
-       c.deleteRequest(r)
        c.Post(r.ToMsg(pp.Reject))
-       return true
+       delete(c.PeerRequests, r)
 }
 
 func (c *connection) onReadRequest(r request) error {
        requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
        if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
+               torrent.Add("bad requests received", 1)
                return errors.New("bad request")
        }
        if _, ok := c.PeerRequests[r]; ok {
+               torrent.Add("duplicate requests received", 1)
                return nil
        }
        if c.Choked {
-               c.reject(r)
+               torrent.Add("requests received while choking", 1)
+               if c.fastEnabled() {
+                       torrent.Add("requests rejected while choking", 1)
+                       c.reject(r)
+               }
                return nil
        }
        if len(c.PeerRequests) >= maxRequests {
-               c.reject(r)
+               torrent.Add("requests received while queue full", 1)
+               if c.fastEnabled() {
+                       c.reject(r)
+               }
+               // BEP 6 says we may close here if we choose.
                return nil
        }
        if !c.t.havePiece(r.Index.Int()) {
@@ -915,6 +946,9 @@ func (c *connection) mainReadLoop() error {
                        continue
                }
                messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+               if msg.Type.FastExtension() && !c.fastEnabled() {
+                       return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
+               }
                switch msg.Type {
                case pp.Choke:
                        c.PeerChoked = true
@@ -922,9 +956,7 @@ func (c *connection) mainReadLoop() error {
                        // We can then reset our interest.
                        c.updateRequests()
                case pp.Reject:
-                       if c.deleteRequest(newRequestFromMessage(&msg)) {
-                               c.updateRequests()
-                       }
+                       c.deleteRequest(newRequestFromMessage(&msg))
                case pp.Unchoke:
                        c.PeerChoked = false
                        c.tickleWriter()
@@ -933,6 +965,7 @@ func (c *connection) mainReadLoop() error {
                        c.tickleWriter()
                case pp.NotInterested:
                        c.PeerInterested = false
+                       // TODO: Reject?
                        c.PeerRequests = nil
                case pp.Have:
                        err = c.peerSentHave(int(msg.Index))
@@ -941,9 +974,7 @@ func (c *connection) mainReadLoop() error {
                        err = c.onReadRequest(r)
                case pp.Cancel:
                        req := newRequestFromMessage(&msg)
-                       if !c.PeerCancel(req) {
-                               unexpectedCancels.Add(1)
-                       }
+                       c.onPeerSentCancel(req)
                case pp.Bitfield:
                        err = c.peerSentBitfield(msg.Bitfield)
                case pp.HaveAll:
@@ -969,6 +1000,15 @@ func (c *connection) mainReadLoop() error {
                                pingAddr.Port = int(msg.Port)
                        }
                        go cl.dHT.Ping(pingAddr, nil)
+               case pp.AllowedFast:
+                       torrent.Add("allowed fasts received", 1)
+                       log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
+                       c.peerAllowedFast.Add(int(msg.Index))
+                       c.updateRequests()
+               case pp.Suggest:
+                       torrent.Add("suggests received", 1)
+                       log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
+                       c.updateRequests()
                default:
                        err = fmt.Errorf("received unknown message type: %#v", msg.Type)
                }
@@ -1117,6 +1157,13 @@ func (c *connection) receiveChunk(msg *pp.Message) {
                unexpectedChunksReceived.Add(1)
        }
 
+       if c.PeerChoked {
+               torrent.Add("chunks received while choked", 1)
+               if c.peerAllowedFast.Get(int(req.Index)) {
+                       torrent.Add("chunks received due to allowed fast", 1)
+               }
+       }
+
        // Do we actually want this chunk?
        if !t.wantPiece(req) {
                unwantedChunksReceived.Add(1)
index 8012fe6155d1361e1b3fd975232896796ebe4f57..9855f8ac6dc99388bb4602aa5f82c61ede542684 100644 (file)
--- a/global.go
+++ b/global.go
@@ -17,7 +17,7 @@ const (
 )
 
 func defaultPeerExtensionBytes() peerExtensionBytes {
-       return newPeerExtensionBytes(ExtensionBitDHT, ExtensionBitExtended)
+       return newPeerExtensionBytes(ExtensionBitDHT, ExtensionBitExtended, ExtensionBitFast)
 }
 
 // I could move a lot of these counters to their own file, but I suspect they
@@ -27,10 +27,11 @@ var (
        unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected")
        chunksReceived           = expvar.NewInt("chunksReceived")
 
+       torrent = expvar.NewMap("torrent")
+
        peersAddedBySource = expvar.NewMap("peersAddedBySource")
 
        uploadChunksPosted = expvar.NewInt("uploadChunksPosted")
-       unexpectedCancels  = expvar.NewInt("unexpectedCancels")
 
        pieceHashedCorrect    = expvar.NewInt("pieceHashedCorrect")
        pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
index cc761399bd819fd2f87e5314f3b72ddd507f5f84..8c2c6d240613db1c32fb12e33fd68617bd88baa6 100644 (file)
@@ -3,14 +3,13 @@ package torrent
 import (
        "testing"
 
-       "github.com/anacrolix/missinggo"
        "github.com/stretchr/testify/assert"
 )
 
 func TestDefaultExtensionBytes(t *testing.T) {
-       var pex peerExtensionBytes
-       missinggo.CopyExact(&pex, defaultPeerExtensionBytes())
+       pex := defaultPeerExtensionBytes()
        assert.True(t, pex.SupportsDHT())
        assert.True(t, pex.SupportsExtended())
-       assert.False(t, pex.SupportsFast())
+       assert.False(t, pex.GetBit(63))
+       assert.Panics(t, func() { pex.GetBit(64) })
 }
index 3f66cdc99fba475592dd5e49d712978db6f75657..37758fd8de495b44d3175156de6c42af535b95bc 100644 (file)
@@ -54,7 +54,7 @@ func (d *Decoder) Decode(msg *Message) (err error) {
        switch msg.Type {
        case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
                return
-       case Have:
+       case Have, AllowedFast, Suggest:
                err = msg.Index.Read(r)
        case Request, Cancel, Reject:
                for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
index 13d76def401226910ac9aa8232f9500aaabc369e..aa53ffdffdc9ab60a20155265ca2dae9c829a1a3 100644 (file)
@@ -16,6 +16,10 @@ func (me MessageType) String() string {
        return strconv.FormatInt(int64(me), 10)
 }
 
+func (mt MessageType) FastExtension() bool {
+       return mt >= Suggest && mt <= AllowedFast
+}
+
 const (
        Choke         MessageType = iota
        Unchoke                   // 1