]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Reject peer requests on data read failures
[btrtrc.git] / peerconn.go
index 4ec0944ba7090f3a9ede320a3873488e0610728e..6866d450c42be68fef857e93b89dcac84bebab8a 100644 (file)
@@ -86,6 +86,8 @@ type Peer struct {
        needRequestUpdate    string
        requestState         requestState
        updateRequestsTimer  *time.Timer
+       lastRequestUpdate    time.Time
+       peakRequests         maxRequests
        lastBecameInterested time.Time
        priorInterest        time.Duration
 
@@ -235,7 +237,7 @@ func (cn *Peer) cumInterest() time.Duration {
        return ret
 }
 
-func (cn *PeerConn) peerHasAllPieces() (all bool, known bool) {
+func (cn *PeerConn) peerHasAllPieces() (all, known bool) {
        if cn.peerSentHaveAll {
                return true, true
        }
@@ -445,7 +447,10 @@ func (cn *Peer) peerHasPiece(piece pieceIndex) bool {
 
 // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
 // https://github.com/pion/datachannel/issues/59 is fixed.
-const writeBufferHighWaterLen = 1 << 15
+const (
+       writeBufferHighWaterLen = 1 << 15
+       writeBufferLowWaterLen  = writeBufferHighWaterLen / 2
+)
 
 // Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is
 // done asynchronously, so it may be that we're not able to honour backpressure from this method.
@@ -481,9 +486,17 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
        return index < len(cn.metadataRequests) && cn.metadataRequests[index]
 }
 
+var (
+       interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary())
+       requestMsgLen    = len(pp.Message{Type: pp.Request}.MustMarshalBinary())
+       // This is the maximum request count that could fit in the write buffer if it's at or below the
+       // low water mark when we run maybeUpdateActualRequestState.
+       maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen
+)
+
 // The actual value to use as the maximum outbound requests.
-func (cn *Peer) nominalMaxRequests() (ret maxRequests) {
-       return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 2048))
+func (cn *Peer) nominalMaxRequests() maxRequests {
+       return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -514,12 +527,7 @@ func (cn *PeerConn) choke(msg messageWriter) (more bool) {
        more = msg(pp.Message{
                Type: pp.Choke,
        })
-       if cn.fastEnabled() {
-               for r := range cn.peerRequests {
-                       // TODO: Don't reject pieces in allowed fast set.
-                       cn.reject(r)
-               }
-       } else {
+       if !cn.fastEnabled() {
                cn.peerRequests = nil
        }
        return
@@ -649,6 +657,7 @@ func (me *Peer) cancel(r RequestIndex) {
                        panic("request already cancelled")
                }
        }
+       me.decPeakRequests()
        if me.isLowOnRequests() {
                me.updateRequests("Peer.cancel")
        }
@@ -662,9 +671,15 @@ func (me *PeerConn) _cancel(r RequestIndex) bool {
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
-       if !cn.maybeUpdateActualRequestState() {
-               return
-       }
+       if cn.messageWriter.writeBuffer.Len() > writeBufferLowWaterLen {
+               // Fully committing to our max requests requires sufficient space (see
+               // maxLocalToRemoteRequests). Flush what we have instead. We also prefer always to make
+               // requests than to do PEX or upload, so we short-circuit before handling those. Any update
+               // request reason will not be cleared, so we'll come right back here when there's space. We
+               // can't do this in maybeUpdateActualRequestState because it's a method on Peer and has no
+               // knowledge of write buffers.
+       }
+       cn.maybeUpdateActualRequestState()
        if cn.pex.IsEnabled() {
                if flow := cn.pex.Share(cn.write); !flow {
                        return
@@ -703,6 +718,9 @@ func (cn *Peer) updateRequests(reason string) {
        if cn.needRequestUpdate != "" {
                return
        }
+       if reason != peerUpdateRequestsTimerReason && !cn.isLowOnRequests() {
+               return
+       }
        cn.needRequestUpdate = reason
        cn.handleUpdateRequests()
 }
@@ -774,28 +792,49 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error {
                // Ignore known excess pieces.
                bf = bf[:cn.t.numPieces()]
        }
-       pp := cn.newPeerPieces()
+       bm := boolSliceToBitmap(bf)
+       if cn.t.haveInfo() && pieceIndex(bm.GetCardinality()) == cn.t.numPieces() {
+               cn.onPeerHasAllPieces()
+               return nil
+       }
+       if !bm.IsEmpty() {
+               cn.raisePeerMinPieces(pieceIndex(bm.Maximum()) + 1)
+       }
+       shouldUpdateRequests := false
+       if cn.peerSentHaveAll {
+               if !cn.t.deleteConnWithAllPieces(&cn.Peer) {
+                       panic(cn)
+               }
+               cn.peerSentHaveAll = false
+               if !cn._peerPieces.IsEmpty() {
+                       panic("if peer has all, we expect no individual peer pieces to be set")
+               }
+       } else {
+               bm.Xor(&cn._peerPieces)
+       }
        cn.peerSentHaveAll = false
-       for i, have := range bf {
-               if have {
-                       cn.raisePeerMinPieces(pieceIndex(i) + 1)
-                       if !pp.Contains(bitmap.BitIndex(i)) {
-                               cn.t.incPieceAvailability(i)
-                       }
+       // bm is now 'on' for pieces that are changing
+       bm.Iterate(func(x uint32) bool {
+               pi := pieceIndex(x)
+               if cn._peerPieces.Contains(x) {
+                       // Then we must be losing this piece
+                       cn.t.decPieceAvailability(pi)
                } else {
-                       if pp.Contains(bitmap.BitIndex(i)) {
-                               cn.t.decPieceAvailability(i)
+                       if !shouldUpdateRequests && cn.t.wantPieceIndex(pieceIndex(x)) {
+                               shouldUpdateRequests = true
                        }
+                       // We must be gaining this piece
+                       cn.t.incPieceAvailability(pieceIndex(x))
                }
-               if have {
-                       cn._peerPieces.Add(uint32(i))
-                       if cn.t.wantPieceIndex(i) {
-                               cn.updateRequests("bitfield")
-                       }
-               } else {
-                       cn._peerPieces.Remove(uint32(i))
-               }
+               return true
+       })
+       // Apply the changes. If we had everything previously, this should be empty, so xor is the same
+       // as or.
+       cn._peerPieces.Xor(&bm)
+       if shouldUpdateRequests {
+               cn.updateRequests("bitfield")
        }
+       // We didn't guard this before, I see no reason to do it now.
        cn.peerPiecesChanged()
        return nil
 }
@@ -803,13 +842,12 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error {
 func (cn *PeerConn) onPeerHasAllPieces() {
        t := cn.t
        if t.haveInfo() {
-               npp, pc := cn.newPeerPieces(), t.numPieces()
-               for i := 0; i < pc; i += 1 {
-                       if !npp.Contains(bitmap.BitIndex(i)) {
-                               t.incPieceAvailability(i)
-                       }
-               }
+               cn._peerPieces.Iterate(func(x uint32) bool {
+                       t.decPieceAvailability(pieceIndex(x))
+                       return true
+               })
        }
+       t.addConnWithAllPieces(&cn.Peer)
        cn.peerSentHaveAll = true
        cn._peerPieces.Clear()
        if !cn.t._pendingPieces.IsEmpty() {
@@ -824,7 +862,9 @@ func (cn *PeerConn) onPeerSentHaveAll() error {
 }
 
 func (cn *PeerConn) peerSentHaveNone() error {
-       cn.t.decPeerPieceAvailability(&cn.Peer)
+       if cn.peerSentHaveAll {
+               cn.t.decPeerPieceAvailability(&cn.Peer)
+       }
        cn._peerPieces.Clear()
        cn.peerSentHaveAll = false
        cn.peerPiecesChanged()
@@ -971,7 +1011,6 @@ func (c *PeerConn) onReadRequest(r Request) error {
        value := &peerRequestState{}
        c.peerRequests[r] = value
        go c.peerRequestDataReader(r, value)
-       // c.tickleWriter()
        return nil
 }
 
@@ -985,6 +1024,7 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
                if b == nil {
                        panic("data must be non-nil to trigger send")
                }
+               torrent.Add("peer request data read successes", 1)
                prs.data = b
                c.tickleWriter()
        }
@@ -993,7 +1033,14 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
 // If this is maintained correctly, we might be able to support optional synchronous reading for
 // chunk sending, the way it used to work.
 func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
-       c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer Request %v: %v", r, err)
+       torrent.Add("peer request data read failures", 1)
+       logLevel := log.Warning
+       if c.t.hasStorageCap() {
+               // It's expected that pieces might drop. See
+               // https://github.com/anacrolix/torrent/issues/702#issuecomment-1000953313.
+               logLevel = log.Debug
+       }
+       c.logger.WithDefaultLevel(logLevel).Printf("error reading chunk for peer Request %v: %v", r, err)
        if c.t.closed.IsSet() {
                return
        }
@@ -1005,14 +1052,23 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
                // here.
                c.t.updatePieceCompletion(i)
        }
-       // If we failed to send a chunk, choke the peer to ensure they flush all their requests. We've
-       // probably dropped a piece from storage, but there's no way to communicate this to the peer. If
-       // they ask for it again, we'll kick them to allow us to send them an updated bitfield on the
-       // next connect. TODO: Support rejecting here too.
-       if c.choking {
-               c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
+       // We've probably dropped a piece from storage, but there's no way to communicate this to the
+       // peer. If they ask for it again, we kick them allowing us to send them updated piece states if
+       // we reconnect. TODO: Instead, we could just try to update them with Bitfield or HaveNone and
+       // if they kick us for breaking protocol, on reconnect we will be compliant again (at least
+       // initially).
+       if c.fastEnabled() {
+               c.reject(r)
+       } else {
+               if c.choking {
+                       // If fast isn't enabled, I think we would have wiped all peer requests when we last
+                       // choked, and requests while we're choking would be ignored. It could be possible that
+                       // a peer request data read completed concurrently to it being deleted elsewhere.
+                       c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
+               }
+               // Choking a non-fast peer should cause them to flush all their requests.
+               c.choke(c.write)
        }
-       c.choke(c.write)
 }
 
 func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {
@@ -1095,7 +1151,13 @@ func (c *PeerConn) mainReadLoop() (err error) {
                                break
                        }
                        if !c.fastEnabled() {
-                               c.deleteAllRequests()
+                               if !c.deleteAllRequests().IsEmpty() {
+                                       c.t.iterPeers(func(p *Peer) {
+                                               if p.isLowOnRequests() {
+                                                       p.updateRequests("choked by non-fast PeerConn")
+                                               }
+                                       })
+                               }
                        } else {
                                // We don't decrement pending requests here, let's wait for the peer to either
                                // reject or satisfy the outstanding requests. Additionally, some peers may unchoke
@@ -1207,7 +1269,9 @@ func (c *PeerConn) mainReadLoop() (err error) {
 
 // Returns true if it was valid to reject the request.
 func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
-       if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
+       if c.deleteRequest(r) {
+               c.decPeakRequests()
+       } else if !c.requestState.Cancelled.CheckedRemove(r) {
                return false
        }
        if c.isLowOnRequests() {
@@ -1554,21 +1618,42 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        }
        delete(c.t.pendingRequests, r)
        delete(c.t.lastRequested, r)
+       // c.t.iterPeers(func(p *Peer) {
+       //      if p.isLowOnRequests() {
+       //              p.updateRequests("Peer.deleteRequest")
+       //      }
+       // })
        return true
 }
 
-func (c *Peer) deleteAllRequests() {
-       c.requestState.Requests.Clone().Iterate(func(x uint32) bool {
+func (c *Peer) deleteAllRequests() (deleted *roaring.Bitmap) {
+       deleted = c.requestState.Requests.Clone()
+       deleted.Iterate(func(x uint32) bool {
                if !c.deleteRequest(x) {
                        panic("request should exist")
                }
                return true
        })
+       c.assertNoRequests()
+       return
+}
+
+func (c *Peer) assertNoRequests() {
        if !c.requestState.Requests.IsEmpty() {
                panic(c.requestState.Requests.GetCardinality())
        }
 }
 
+func (c *Peer) cancelAllRequests() (cancelled *roaring.Bitmap) {
+       cancelled = c.requestState.Requests.Clone()
+       cancelled.Iterate(func(x uint32) bool {
+               c.cancel(x)
+               return true
+       })
+       c.assertNoRequests()
+       return
+}
+
 // This is called when something has changed that should wake the writer, such as putting stuff into
 // the writeBuffer, or changing some state that the writer can act on.
 func (c *PeerConn) tickleWriter() {
@@ -1694,10 +1779,6 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        return pc, ok
 }
 
-func (pc *PeerConn) isLowOnRequests() bool {
-       return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
-}
-
 func (p *Peer) uncancelledRequests() uint64 {
        return p.requestState.Requests.GetCardinality()
 }