]> Sergey Matveev's repositories - btrtrc.git/blobdiff - connection.go
Support AllowedFast and enable fast extension
[btrtrc.git] / connection.go
index a49f4fa27b0f43d16de638d2ddc377696b82869b..16888aa0ca104f96721bca889ffb2b319aededf8 100644 (file)
@@ -85,6 +85,7 @@ type connection struct {
        peerMinPieces int
        // Pieces we've accepted chunks for from the peer.
        peerTouchedPieces map[int]struct{}
+       peerAllowedFast   bitmap.Bitmap
 
        PeerMaxRequests  int // Maximum pending requests the peer allows.
        PeerExtensionIDs map[string]byte
@@ -303,27 +304,35 @@ func (cn *connection) nominalMaxRequests() (ret int) {
        return
 }
 
-// Returns true if an unsatisfied request was canceled.
-func (cn *connection) PeerCancel(r request) bool {
-       if cn.PeerRequests == nil {
-               return false
-       }
+func (cn *connection) onPeerSentCancel(r request) {
        if _, ok := cn.PeerRequests[r]; !ok {
-               return false
+               torrent.Add("unexpected cancels received", 1)
+               return
+       }
+       if cn.fastEnabled() {
+               cn.reject(r)
+       } else {
+               delete(cn.PeerRequests, r)
        }
-       delete(cn.PeerRequests, r)
-       return true
 }
 
-func (cn *connection) Choke(msg func(pp.Message) bool) bool {
+func (cn *connection) Choke(msg messageWriter) (more bool) {
        if cn.Choked {
                return true
        }
-       cn.PeerRequests = nil
        cn.Choked = true
-       return msg(pp.Message{
+       more = msg(pp.Message{
                Type: pp.Choke,
        })
+       if cn.fastEnabled() {
+               for r := range cn.PeerRequests {
+                       // TODO: Don't reject pieces in allowed fast set.
+                       cn.reject(r)
+               }
+       } else {
+               cn.PeerRequests = nil
+       }
+       return
 }
 
 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
@@ -372,6 +381,13 @@ func (cn *connection) request(r request, mw messageWriter) bool {
        if _, ok := cn.t.conns[cn]; !ok {
                panic("requesting but not in active conns")
        }
+       if cn.PeerChoked {
+               if cn.peerAllowedFast.Get(int(r.Index)) {
+                       torrent.Add("allowed fast requests sent", 1)
+               } else {
+                       panic("requesting while choked and not allowed fast")
+               }
+       }
        cn.t.pendingRequests[r]++
        return mw(pp.Message{
                Type:   pp.Request,
@@ -505,6 +521,7 @@ func nextRequestState(
        iterPendingRequests func(f func(request) bool),
        requestsLowWater int,
        requestsHighWater int,
+       allowedFast bitmap.Bitmap,
 ) (
        cancelExisting bool, // Cancel all our pending requests
        newRequests []request, // Chunks to request that we currently aren't
@@ -519,7 +536,12 @@ func nextRequestState(
        iterPendingRequests(func(r request) bool {
                interested = true
                if peerChoking {
-                       return false
+                       if allowedFast.IsEmpty() {
+                               return false
+                       }
+                       if !allowedFast.Get(int(r.Index)) {
+                               return true
+                       }
                }
                if _, ok := currentRequests[r]; !ok {
                        if newRequests == nil {
@@ -632,6 +654,7 @@ func (cn *connection) desiredRequestState() (bool, []request, bool) {
                cn.iterPendingRequests,
                cn.requestsLowWater,
                cn.nominalMaxRequests(),
+               cn.peerAllowedFast,
        )
 }
 
@@ -840,30 +863,38 @@ func (c *connection) fastEnabled() bool {
        return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
 }
 
-// Returns true if we were able to reject the request.
-func (c *connection) reject(r request) bool {
+func (c *connection) reject(r request) {
        if !c.fastEnabled() {
-               return false
+               panic("fast not enabled")
        }
-       c.deleteRequest(r)
        c.Post(r.ToMsg(pp.Reject))
-       return true
+       delete(c.PeerRequests, r)
 }
 
 func (c *connection) onReadRequest(r request) error {
        requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
        if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
+               torrent.Add("bad requests received", 1)
                return errors.New("bad request")
        }
        if _, ok := c.PeerRequests[r]; ok {
+               torrent.Add("duplicate requests received", 1)
                return nil
        }
        if c.Choked {
-               c.reject(r)
+               torrent.Add("requests received while choking", 1)
+               if c.fastEnabled() {
+                       torrent.Add("requests rejected while choking", 1)
+                       c.reject(r)
+               }
                return nil
        }
        if len(c.PeerRequests) >= maxRequests {
-               c.reject(r)
+               torrent.Add("requests received while queue full", 1)
+               if c.fastEnabled() {
+                       c.reject(r)
+               }
+               // BEP 6 says we may close here if we choose.
                return nil
        }
        if !c.t.havePiece(r.Index.Int()) {
@@ -915,6 +946,9 @@ func (c *connection) mainReadLoop() error {
                        continue
                }
                messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+               if msg.Type.FastExtension() && !c.fastEnabled() {
+                       return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
+               }
                switch msg.Type {
                case pp.Choke:
                        c.PeerChoked = true
@@ -922,9 +956,7 @@ func (c *connection) mainReadLoop() error {
                        // We can then reset our interest.
                        c.updateRequests()
                case pp.Reject:
-                       if c.deleteRequest(newRequestFromMessage(&msg)) {
-                               c.updateRequests()
-                       }
+                       c.deleteRequest(newRequestFromMessage(&msg))
                case pp.Unchoke:
                        c.PeerChoked = false
                        c.tickleWriter()
@@ -933,6 +965,7 @@ func (c *connection) mainReadLoop() error {
                        c.tickleWriter()
                case pp.NotInterested:
                        c.PeerInterested = false
+                       // TODO: Reject?
                        c.PeerRequests = nil
                case pp.Have:
                        err = c.peerSentHave(int(msg.Index))
@@ -941,9 +974,7 @@ func (c *connection) mainReadLoop() error {
                        err = c.onReadRequest(r)
                case pp.Cancel:
                        req := newRequestFromMessage(&msg)
-                       if !c.PeerCancel(req) {
-                               unexpectedCancels.Add(1)
-                       }
+                       c.onPeerSentCancel(req)
                case pp.Bitfield:
                        err = c.peerSentBitfield(msg.Bitfield)
                case pp.HaveAll:
@@ -969,6 +1000,15 @@ func (c *connection) mainReadLoop() error {
                                pingAddr.Port = int(msg.Port)
                        }
                        go cl.dHT.Ping(pingAddr, nil)
+               case pp.AllowedFast:
+                       torrent.Add("allowed fasts received", 1)
+                       log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
+                       c.peerAllowedFast.Add(int(msg.Index))
+                       c.updateRequests()
+               case pp.Suggest:
+                       torrent.Add("suggests received", 1)
+                       log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
+                       c.updateRequests()
                default:
                        err = fmt.Errorf("received unknown message type: %#v", msg.Type)
                }
@@ -1117,6 +1157,13 @@ func (c *connection) receiveChunk(msg *pp.Message) {
                unexpectedChunksReceived.Add(1)
        }
 
+       if c.PeerChoked {
+               torrent.Add("chunks received while choked", 1)
+               if c.peerAllowedFast.Get(int(req.Index)) {
+                       torrent.Add("chunks received due to allowed fast", 1)
+               }
+       }
+
        // Do we actually want this chunk?
        if !t.wantPiece(req) {
                unwantedChunksReceived.Add(1)