client.go | 12 +++++++----- misc.go | 10 ++++++++++ peer-impl.go | 2 -- peer.go | 15 +++++++++++++++ peerconn.go | 41 +++++++++++++++++++++++++++++++---------- peerconn_test.go | 8 ++++++++ request-strategy/peer.go | 3 ++- requesting.go | 58 ++++++++++++++++++++++++++++++++++++++--------------- torrent.go | 1 + webseed-peer.go | 4 ---- diff --git a/client.go b/client.go index a6de6c08e2222510d2691ad7f464134538a9c558..e36ec0e980834516cd0a34c338c879389da864f5 100644 --- a/client.go +++ b/client.go @@ -985,23 +985,25 @@ panic(p.updateRequestsTimer) } } p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc) - p.updateRequestsTimer.Stop() } + +const peerUpdateRequestsTimerReason = "updateRequestsTimer" func (c *Peer) updateRequestsTimerFunc() { c.locker().Lock() defer c.locker().Unlock() if c.closed.IsSet() { - return - } - if c.needRequestUpdate != "" { return } if c.isLowOnRequests() { // If there are no outstanding requests, then a request update should have already run. return } - c.updateRequests("updateRequestsTimer") + if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration { + log.Printf("spurious timer requests update [interval=%v]", d) + return + } + c.updateRequests(peerUpdateRequestsTimerReason) } // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this diff --git a/misc.go b/misc.go index db924bbe99bf944c55594efe95ac15c69e6edaec..4041b776b3790c367db569cb87e5d902245e56cf 100644 --- a/misc.go +++ b/misc.go @@ -143,6 +143,16 @@ } return ret } +func maxInt(as ...int) int { + ret := as[0] + for _, a := range as[1:] { + if a > ret { + ret = a + } + } + return ret +} + func min(as ...int64) int64 { ret := as[0] for _, a := range as[1:] { diff --git a/peer-impl.go b/peer-impl.go index 47b4345ae628f629003fde3da6848090edaccf23..e29fb43967cb255086dc3fa73dead6b077749df4 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -11,8 +11,6 @@ // legacy PeerConn methods. type peerImpl interface { // Trigger the actual request state to get updated handleUpdateRequests() - // Whether the outstanding local request cardinality is low enough to warrant an update. - isLowOnRequests() bool writeInterested(interested bool) bool // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be diff --git a/peer.go b/peer.go new file mode 100644 index 0000000000000000000000000000000000000000..e1bab1840d970c1a97292da2773297a520e3820e --- /dev/null +++ b/peer.go @@ -0,0 +1,15 @@ +package torrent + +func (p *Peer) isLowOnRequests() bool { + return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty() +} + +func (p *Peer) decPeakRequests() { + // // This can occur when peak requests are altered by the update request timer to be lower than + // // the actual number of outstanding requests. Let's let it go negative and see what happens. I + // // wonder what happens if maxRequests is not signed. + // if p.peakRequests < 1 { + // panic(p.peakRequests) + // } + p.peakRequests-- +} diff --git a/peerconn.go b/peerconn.go index 406922254a4ec10ab40fbfc55d27dc887315dee4..fb28dc2275b3388c45ae1c25e66d68652c67e565 100644 --- a/peerconn.go +++ b/peerconn.go @@ -86,6 +86,8 @@ // Stuff controlled by the local peer. needRequestUpdate string requestState requestState updateRequestsTimer *time.Timer + lastRequestUpdate time.Time + peakRequests maxRequests lastBecameInterested time.Time priorInterest time.Duration @@ -445,7 +447,10 @@ } // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when // https://github.com/pion/datachannel/issues/59 is fixed. -const writeBufferHighWaterLen = 1 << 15 +const ( + writeBufferHighWaterLen = 1 << 15 + writeBufferLowWaterLen = writeBufferHighWaterLen / 2 +) // Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is // done asynchronously, so it may be that we're not able to honour backpressure from this method. @@ -481,9 +486,17 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool { return index < len(cn.metadataRequests) && cn.metadataRequests[index] } +var ( + interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary()) + requestMsgLen = len(pp.Message{Type: pp.Request}.MustMarshalBinary()) + // This is the maximum request count that could fit in the write buffer if it's at or below the + // low water mark when we run maybeUpdateActualRequestState. + maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen +) + // The actual value to use as the maximum outbound requests. -func (cn *Peer) nominalMaxRequests() (ret maxRequests) { - return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 2048)) +func (cn *Peer) nominalMaxRequests() maxRequests { + return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))) } func (cn *Peer) totalExpectingTime() (ret time.Duration) { @@ -649,6 +662,7 @@ if !me.requestState.Cancelled.CheckedAdd(r) { panic("request already cancelled") } } + me.decPeakRequests() if me.isLowOnRequests() { me.updateRequests("Peer.cancel") } @@ -662,9 +676,15 @@ return me.fastEnabled() && !me.remoteIsTransmission() } func (cn *PeerConn) fillWriteBuffer() { - if !cn.maybeUpdateActualRequestState() { - return + if cn.messageWriter.writeBuffer.Len() > writeBufferLowWaterLen { + // Fully committing to our max requests requires sufficient space (see + // maxLocalToRemoteRequests). Flush what we have instead. We also prefer always to make + // requests than to do PEX or upload, so we short-circuit before handling those. Any update + // request reason will not be cleared, so we'll come right back here when there's space. We + // can't do this in maybeUpdateActualRequestState because it's a method on Peer and has no + // knowledge of write buffers. } + cn.maybeUpdateActualRequestState() if cn.pex.IsEnabled() { if flow := cn.pex.Share(cn.write); !flow { return @@ -703,6 +723,9 @@ func (cn *Peer) updateRequests(reason string) { if cn.needRequestUpdate != "" { return } + if reason != peerUpdateRequestsTimerReason && !cn.isLowOnRequests() { + return + } cn.needRequestUpdate = reason cn.handleUpdateRequests() } @@ -1235,7 +1258,9 @@ } // Returns true if it was valid to reject the request. func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { - if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) { + if c.deleteRequest(r) { + c.decPeakRequests() + } else if !c.requestState.Cancelled.CheckedRemove(r) { return false } if c.isLowOnRequests() { @@ -1741,10 +1766,6 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { pc, ok := p.peerImpl.(*PeerConn) return pc, ok -} - -func (pc *PeerConn) isLowOnRequests() bool { - return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty() } func (p *Peer) uncancelledRequests() uint64 { diff --git a/peerconn_test.go b/peerconn_test.go index 7ecb6933357460ea77c244d7c1622d3c3ada1cb5..2d5aa681c9462b10ea750b15bbac835c99eeeb4c 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -238,3 +238,11 @@ // pieces. {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0}, }) } + +func TestApplyRequestStateWriteBufferConstraints(t *testing.T) { + c := qt.New(t) + c.Check(interestedMsgLen, qt.Equals, 5) + c.Check(requestMsgLen, qt.Equals, 17) + c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue) + c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests) +} diff --git a/request-strategy/peer.go b/request-strategy/peer.go index 6a69535f955c70fd90f9721051e2971de7cb5298..d5366d7603fb65baab9629f37ef180132b596ae0 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -6,7 +6,8 @@ ) type PeerRequestState struct { Interested bool - // Expecting + // Expecting. TODO: This should be ordered so webseed requesters initiate in the same order they + // were assigned. Requests roaring.Bitmap // Cancelled and waiting response Cancelled roaring.Bitmap diff --git a/requesting.go b/requesting.go index 3b7359da6aa7e66679bf163e8c931a736c9b0cac..f97f82b524ee7190b8a49a173c0d61082651f52f 100644 --- a/requesting.go +++ b/requesting.go @@ -4,6 +4,7 @@ import ( "container/heap" "context" "encoding/gob" + "fmt" "reflect" "runtime/pprof" "time" @@ -201,31 +202,46 @@ desired.Requests = requestHeap return } -func (p *Peer) maybeUpdateActualRequestState() bool { +func (p *Peer) maybeUpdateActualRequestState() { + if p.closed.IsSet() { + return + } if p.needRequestUpdate == "" { - return true + return + } + if p.needRequestUpdate == peerUpdateRequestsTimerReason { + since := time.Since(p.lastRequestUpdate) + if since < updateRequestsTimerDuration { + panic(since) + } } - var more bool pprof.Do( context.Background(), pprof.Labels("update request", p.needRequestUpdate), func(_ context.Context) { next := p.getDesiredRequestState() - more = p.applyRequestState(next) + p.applyRequestState(next) }, ) - return more } // Transmit/action the request state to the peer. -func (p *Peer) applyRequestState(next desiredRequestState) bool { +func (p *Peer) applyRequestState(next desiredRequestState) { current := &p.requestState if !p.setInterested(next.Interested) { - return false + panic("insufficient write buffer") } more := true requestHeap := &next.Requests t := p.t + originalRequestCount := current.Requests.GetCardinality() + // We're either here on a timer, or because we ran out of requests. Both are valid reasons to + // alter peakRequests. + if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason { + panic(fmt.Sprintf( + "expected zero existing requests (%v) for update reason %q", + originalRequestCount, p.needRequestUpdate)) + } heap.Init(requestHeap) for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() { req := heap.Pop(requestHeap).(RequestIndex) @@ -245,14 +261,24 @@ if !more { break } } - // TODO: This may need to change, we might want to update even if there were no requests due to - // filtering them for being recently requested already. - p.updateRequestsTimer.Stop() - if more { - p.needRequestUpdate = "" - if current.Interested { - p.updateRequestsTimer.Reset(3 * time.Second) - } + if !more { + // This might fail if we incorrectly determine that we can fit up to the maximum allowed + // requests into the available write buffer space. We don't want that to happen because it + // makes our peak requests dependent on how much was already in the buffer. + panic(fmt.Sprintf( + "couldn't fill apply entire request state [newRequests=%v]", + current.Requests.GetCardinality()-originalRequestCount)) } - return more + newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount) + // log.Printf( + // "requests %v->%v (peak %v->%v) reason %q (peer %v)", + // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p) + p.peakRequests = newPeakRequests + p.needRequestUpdate = "" + p.lastRequestUpdate = time.Now() + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) } + +// This could be set to 10s to match the unchoke/request update interval recommended by some +// specifications. I've set it shorter to trigger it more often for testing for now. +const updateRequestsTimerDuration = 3 * time.Second diff --git a/torrent.go b/torrent.go index c0c57564c86cd2e97c2febd1ec0288b07ba55124..2e6fb6ef9e1115ca2b55082677cca1520477d23e 100644 --- a/torrent.go +++ b/torrent.go @@ -2241,6 +2241,7 @@ t.cl.lock() defer t.cl.unlock() t.dataUploadDisallowed = true for c := range t.conns { + // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead? c.updateRequests("disallow data upload") } } diff --git a/webseed-peer.go b/webseed-peer.go index f2ef7a81b39e0072dfe2733e7e1b0707160dc101..2486aba05882f722b38953f9371a16c9057543df 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -187,10 +187,6 @@ } return err } -func (me *webseedPeer) isLowOnRequests() bool { - return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests) -} - func (me *webseedPeer) peerPieces() *roaring.Bitmap { return &me.client.Pieces }