]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Dynamic outbound max requests
authorMatt Joiner <anacrolix@gmail.com>
Thu, 23 Dec 2021 21:55:57 +0000 (08:55 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 23 Dec 2021 21:55:57 +0000 (08:55 +1100)
This might be one solution to https://github.com/anacrolix/torrent/issues/698.

client.go
misc.go
peer-impl.go
peer.go [new file with mode: 0644]
peerconn.go
peerconn_test.go
request-strategy/peer.go
requesting.go
torrent.go
webseed-peer.go

index a6de6c08e2222510d2691ad7f464134538a9c558..e36ec0e980834516cd0a34c338c879389da864f5 100644 (file)
--- a/client.go
+++ b/client.go
@@ -985,23 +985,25 @@ func (p *Peer) initUpdateRequestsTimer() {
                }
        }
        p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
-       p.updateRequestsTimer.Stop()
 }
 
+const peerUpdateRequestsTimerReason = "updateRequestsTimer"
+
 func (c *Peer) updateRequestsTimerFunc() {
        c.locker().Lock()
        defer c.locker().Unlock()
        if c.closed.IsSet() {
                return
        }
-       if c.needRequestUpdate != "" {
-               return
-       }
        if c.isLowOnRequests() {
                // If there are no outstanding requests, then a request update should have already run.
                return
        }
-       c.updateRequests("updateRequestsTimer")
+       if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
+               log.Printf("spurious timer requests update [interval=%v]", d)
+               return
+       }
+       c.updateRequests(peerUpdateRequestsTimerReason)
 }
 
 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
diff --git a/misc.go b/misc.go
index db924bbe99bf944c55594efe95ac15c69e6edaec..4041b776b3790c367db569cb87e5d902245e56cf 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -143,6 +143,16 @@ func max(as ...int64) int64 {
        return ret
 }
 
+func maxInt(as ...int) int {
+       ret := as[0]
+       for _, a := range as[1:] {
+               if a > ret {
+                       ret = a
+               }
+       }
+       return ret
+}
+
 func min(as ...int64) int64 {
        ret := as[0]
        for _, a := range as[1:] {
index 47b4345ae628f629003fde3da6848090edaccf23..e29fb43967cb255086dc3fa73dead6b077749df4 100644 (file)
@@ -11,8 +11,6 @@ import (
 type peerImpl interface {
        // Trigger the actual request state to get updated
        handleUpdateRequests()
-       // Whether the outstanding local request cardinality is low enough to warrant an update.
-       isLowOnRequests() bool
        writeInterested(interested bool) bool
 
        // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
diff --git a/peer.go b/peer.go
new file mode 100644 (file)
index 0000000..e1bab18
--- /dev/null
+++ b/peer.go
@@ -0,0 +1,15 @@
+package torrent
+
+func (p *Peer) isLowOnRequests() bool {
+       return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
+}
+
+func (p *Peer) decPeakRequests() {
+       // // This can occur when peak requests are altered by the update request timer to be lower than
+       // // the actual number of outstanding requests. Let's let it go negative and see what happens. I
+       // // wonder what happens if maxRequests is not signed.
+       // if p.peakRequests < 1 {
+       //      panic(p.peakRequests)
+       // }
+       p.peakRequests--
+}
index 406922254a4ec10ab40fbfc55d27dc887315dee4..fb28dc2275b3388c45ae1c25e66d68652c67e565 100644 (file)
@@ -86,6 +86,8 @@ type Peer struct {
        needRequestUpdate    string
        requestState         requestState
        updateRequestsTimer  *time.Timer
+       lastRequestUpdate    time.Time
+       peakRequests         maxRequests
        lastBecameInterested time.Time
        priorInterest        time.Duration
 
@@ -445,7 +447,10 @@ func (cn *Peer) peerHasPiece(piece pieceIndex) bool {
 
 // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
 // https://github.com/pion/datachannel/issues/59 is fixed.
-const writeBufferHighWaterLen = 1 << 15
+const (
+       writeBufferHighWaterLen = 1 << 15
+       writeBufferLowWaterLen  = writeBufferHighWaterLen / 2
+)
 
 // Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is
 // done asynchronously, so it may be that we're not able to honour backpressure from this method.
@@ -481,9 +486,17 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
        return index < len(cn.metadataRequests) && cn.metadataRequests[index]
 }
 
+var (
+       interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary())
+       requestMsgLen    = len(pp.Message{Type: pp.Request}.MustMarshalBinary())
+       // This is the maximum request count that could fit in the write buffer if it's at or below the
+       // low water mark when we run maybeUpdateActualRequestState.
+       maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen
+)
+
 // The actual value to use as the maximum outbound requests.
-func (cn *Peer) nominalMaxRequests() (ret maxRequests) {
-       return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 2048))
+func (cn *Peer) nominalMaxRequests() maxRequests {
+       return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -649,6 +662,7 @@ func (me *Peer) cancel(r RequestIndex) {
                        panic("request already cancelled")
                }
        }
+       me.decPeakRequests()
        if me.isLowOnRequests() {
                me.updateRequests("Peer.cancel")
        }
@@ -662,9 +676,15 @@ func (me *PeerConn) _cancel(r RequestIndex) bool {
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
-       if !cn.maybeUpdateActualRequestState() {
-               return
-       }
+       if cn.messageWriter.writeBuffer.Len() > writeBufferLowWaterLen {
+               // Fully committing to our max requests requires sufficient space (see
+               // maxLocalToRemoteRequests). Flush what we have instead. We also prefer always to make
+               // requests than to do PEX or upload, so we short-circuit before handling those. Any update
+               // request reason will not be cleared, so we'll come right back here when there's space. We
+               // can't do this in maybeUpdateActualRequestState because it's a method on Peer and has no
+               // knowledge of write buffers.
+       }
+       cn.maybeUpdateActualRequestState()
        if cn.pex.IsEnabled() {
                if flow := cn.pex.Share(cn.write); !flow {
                        return
@@ -703,6 +723,9 @@ func (cn *Peer) updateRequests(reason string) {
        if cn.needRequestUpdate != "" {
                return
        }
+       if reason != peerUpdateRequestsTimerReason && !cn.isLowOnRequests() {
+               return
+       }
        cn.needRequestUpdate = reason
        cn.handleUpdateRequests()
 }
@@ -1235,7 +1258,9 @@ func (c *PeerConn) mainReadLoop() (err error) {
 
 // Returns true if it was valid to reject the request.
 func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
-       if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
+       if c.deleteRequest(r) {
+               c.decPeakRequests()
+       } else if !c.requestState.Cancelled.CheckedRemove(r) {
                return false
        }
        if c.isLowOnRequests() {
@@ -1743,10 +1768,6 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        return pc, ok
 }
 
-func (pc *PeerConn) isLowOnRequests() bool {
-       return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
-}
-
 func (p *Peer) uncancelledRequests() uint64 {
        return p.requestState.Requests.GetCardinality()
 }
index 7ecb6933357460ea77c244d7c1622d3c3ada1cb5..2d5aa681c9462b10ea750b15bbac835c99eeeb4c 100644 (file)
@@ -238,3 +238,11 @@ func TestHaveAllThenBitfield(t *testing.T) {
                {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
        })
 }
+
+func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
+       c := qt.New(t)
+       c.Check(interestedMsgLen, qt.Equals, 5)
+       c.Check(requestMsgLen, qt.Equals, 17)
+       c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
+       c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
+}
index 6a69535f955c70fd90f9721051e2971de7cb5298..d5366d7603fb65baab9629f37ef180132b596ae0 100644 (file)
@@ -6,7 +6,8 @@ import (
 
 type PeerRequestState struct {
        Interested bool
-       // Expecting
+       // Expecting. TODO: This should be ordered so webseed requesters initiate in the same order they
+       // were assigned.
        Requests roaring.Bitmap
        // Cancelled and waiting response
        Cancelled roaring.Bitmap
index 3b7359da6aa7e66679bf163e8c931a736c9b0cac..f97f82b524ee7190b8a49a173c0d61082651f52f 100644 (file)
@@ -4,6 +4,7 @@ import (
        "container/heap"
        "context"
        "encoding/gob"
+       "fmt"
        "reflect"
        "runtime/pprof"
        "time"
@@ -201,31 +202,46 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
        return
 }
 
-func (p *Peer) maybeUpdateActualRequestState() bool {
+func (p *Peer) maybeUpdateActualRequestState() {
+       if p.closed.IsSet() {
+               return
+       }
        if p.needRequestUpdate == "" {
-               return true
+               return
+       }
+       if p.needRequestUpdate == peerUpdateRequestsTimerReason {
+               since := time.Since(p.lastRequestUpdate)
+               if since < updateRequestsTimerDuration {
+                       panic(since)
+               }
        }
-       var more bool
        pprof.Do(
                context.Background(),
                pprof.Labels("update request", p.needRequestUpdate),
                func(_ context.Context) {
                        next := p.getDesiredRequestState()
-                       more = p.applyRequestState(next)
+                       p.applyRequestState(next)
                },
        )
-       return more
 }
 
 // Transmit/action the request state to the peer.
-func (p *Peer) applyRequestState(next desiredRequestState) bool {
+func (p *Peer) applyRequestState(next desiredRequestState) {
        current := &p.requestState
        if !p.setInterested(next.Interested) {
-               return false
+               panic("insufficient write buffer")
        }
        more := true
        requestHeap := &next.Requests
        t := p.t
+       originalRequestCount := current.Requests.GetCardinality()
+       // We're either here on a timer, or because we ran out of requests. Both are valid reasons to
+       // alter peakRequests.
+       if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason {
+               panic(fmt.Sprintf(
+                       "expected zero existing requests (%v) for update reason %q",
+                       originalRequestCount, p.needRequestUpdate))
+       }
        heap.Init(requestHeap)
        for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() {
                req := heap.Pop(requestHeap).(RequestIndex)
@@ -245,14 +261,24 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
                        break
                }
        }
-       // TODO: This may need to change, we might want to update even if there were no requests due to
-       // filtering them for being recently requested already.
-       p.updateRequestsTimer.Stop()
-       if more {
-               p.needRequestUpdate = ""
-               if current.Interested {
-                       p.updateRequestsTimer.Reset(3 * time.Second)
-               }
+       if !more {
+               // This might fail if we incorrectly determine that we can fit up to the maximum allowed
+               // requests into the available write buffer space. We don't want that to happen because it
+               // makes our peak requests dependent on how much was already in the buffer.
+               panic(fmt.Sprintf(
+                       "couldn't fill apply entire request state [newRequests=%v]",
+                       current.Requests.GetCardinality()-originalRequestCount))
        }
-       return more
+       newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount)
+       // log.Printf(
+       //      "requests %v->%v (peak %v->%v) reason %q (peer %v)",
+       //      originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p)
+       p.peakRequests = newPeakRequests
+       p.needRequestUpdate = ""
+       p.lastRequestUpdate = time.Now()
+       p.updateRequestsTimer.Reset(updateRequestsTimerDuration)
 }
+
+// This could be set to 10s to match the unchoke/request update interval recommended by some
+// specifications. I've set it shorter to trigger it more often for testing for now.
+const updateRequestsTimerDuration = 3 * time.Second
index c0c57564c86cd2e97c2febd1ec0288b07ba55124..2e6fb6ef9e1115ca2b55082677cca1520477d23e 100644 (file)
@@ -2241,6 +2241,7 @@ func (t *Torrent) DisallowDataUpload() {
        defer t.cl.unlock()
        t.dataUploadDisallowed = true
        for c := range t.conns {
+               // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
                c.updateRequests("disallow data upload")
        }
 }
index f2ef7a81b39e0072dfe2733e7e1b0707160dc101..2486aba05882f722b38953f9371a16c9057543df 100644 (file)
@@ -187,10 +187,6 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
        return err
 }
 
-func (me *webseedPeer) isLowOnRequests() bool {
-       return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests)
-}
-
 func (me *webseedPeer) peerPieces() *roaring.Bitmap {
        return &me.client.Pieces
 }