torrent.Add("spurious timer requests updates", 1)
return
}
- c.updateRequests(peerUpdateRequestsTimerReason)
+ c.onNeedUpdateRequests(peerUpdateRequestsTimerReason)
}
// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
// newHotPeerImpl.
type legacyPeerImpl interface {
// Trigger the actual request state to get updated
- handleUpdateRequests()
+ handleOnNeedUpdateRequests()
writeInterested(interested bool) bool
+ // Actually go ahead and modify the pending requests.
+ updateRequests()
// _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
// handled by a follow-up event.
return nil
}
-func (cn *Peer) mustRequest(r RequestIndex) bool {
+func (cn *PeerConn) mustRequest(r RequestIndex) bool {
more, err := cn.request(r)
if err != nil {
panic(err)
return more
}
-func (cn *Peer) request(r RequestIndex) (more bool, err error) {
+func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
}
cn.validReceiveChunks[r]++
cn.t.requestState[r] = requestState{
- peer: cn,
+ peer: &cn.Peer,
when: time.Now(),
}
cn.updateExpectingChunks()
ppReq := cn.t.requestIndexToRequest(r)
for _, f := range cn.callbacks.SentRequest {
- f(PeerRequestEvent{cn, ppReq})
+ f(PeerRequestEvent{&cn.Peer, ppReq})
}
return cn.legacyPeerImpl._request(ppReq), nil
}
}
me.decPeakRequests()
if me.isLowOnRequests() {
- me.updateRequests(peerUpdateRequestsPeerCancelReason)
+ me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
}
}
// Sets a reason to update requests, and if there wasn't already one, handle it.
-func (cn *Peer) updateRequests(reason updateRequestReason) {
+func (cn *Peer) onNeedUpdateRequests(reason updateRequestReason) {
if cn.needRequestUpdate != "" {
return
}
cn.needRequestUpdate = reason
- cn.handleUpdateRequests()
+ cn.handleOnNeedUpdateRequests()
}
// Emits the indices in the Bitmaps bms in order, never repeating any index.
return false
}
if c.isLowOnRequests() {
- c.updateRequests(peerUpdateRequestsRemoteRejectReason)
+ c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason)
}
c.decExpectedChunkReceive(r)
return true
c._chunksReceivedWhileExpecting++
}
if c.isLowOnRequests() {
- c.updateRequests("Peer.receiveChunk deleted request")
+ c.onNeedUpdateRequests("Peer.receiveChunk deleted request")
}
} else {
ChunksReceived.Add("unintended", 1)
// Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a
// request update runs while we're writing the chunk that just failed. Then we never do a
// fresh update after pending the failed request.
- c.updateRequests("Peer.receiveChunk error writing chunk")
+ c.onNeedUpdateRequests("Peer.receiveChunk error writing chunk")
t.onWriteChunkErr(err)
return nil
}
delete(c.t.requestState, r)
// c.t.iterPeers(func(p *Peer) {
// if p.isLowOnRequests() {
- // p.updateRequests("Peer.deleteRequest")
+ // p.onNeedUpdateRequests("Peer.deleteRequest")
// }
// })
return true
c.assertNoRequests()
c.t.iterPeers(func(p *Peer) {
if p.isLowOnRequests() {
- p.updateRequests(reason)
+ p.onNeedUpdateRequests(reason)
}
})
}
cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()}
}
-func (cn *PeerConn) handleUpdateRequests() {
+func (cn *PeerConn) handleOnNeedUpdateRequests() {
// The writer determines the request state as needed when it can write.
cn.tickleWriter()
}
}
cn._peerPieces.Add(uint32(piece))
if cn.t.wantPieceIndex(piece) {
- cn.updateRequests("have")
+ cn.onNeedUpdateRequests("have")
}
cn.peerPiecesChanged()
return nil
// as or.
cn._peerPieces.Xor(&bm)
if shouldUpdateRequests {
- cn.updateRequests("bitfield")
+ cn.onNeedUpdateRequests("bitfield")
}
// We didn't guard this before, I see no reason to do it now.
cn.peerPiecesChanged()
func (cn *PeerConn) peerHasAllPiecesTriggers() {
if !cn.t._pendingPieces.IsEmpty() {
- cn.updateRequests("Peer.onPeerHasAllPieces")
+ cn.onNeedUpdateRequests("Peer.onPeerHasAllPieces")
}
cn.peerPiecesChanged()
}
torrent.Add("requestsPreservedThroughChoking", int64(preservedCount))
}
if !c.t._pendingPieces.IsEmpty() {
- c.updateRequests("unchoked")
+ c.onNeedUpdateRequests("unchoked")
}
c.updateExpectingChunks()
case pp.Interested:
case pp.Suggest:
torrent.Add("suggests received", 1)
log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).LogLevel(log.Debug, c.t.logger)
- c.updateRequests("suggested")
+ c.onNeedUpdateRequests("suggested")
case pp.HaveAll:
err = c.onPeerSentHaveAll()
case pp.HaveNone:
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger)
- c.updateRequests("PeerConn.mainReadLoop allowed fast")
+ c.onNeedUpdateRequests("PeerConn.mainReadLoop allowed fast")
case pp.Extended:
err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
case pp.Hashes:
// Calls f with requestable pieces in order.
func GetRequestablePieces(
input Input, pro *PieceRequestOrder,
- // Returns true if the piece should be considered against the unverified bytes limit.
+ // Returns true if the piece should be considered against the unverified bytes limit. This is
+ // based on whether the callee intends to request from the piece.
requestPiece func(ih metainfo.Hash, pieceIndex int, orderState PieceRequestOrderState) bool,
) {
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
Interested bool
}
-func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
+// This gets the best-case request state. That means handling pieces limited by capacity, preferring
+// earlier pieces, low availability etc. It pays no attention to existing requests on the peer or
+// other peers. Those are handled later.
+func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) {
t := p.t
if !t.haveInfo() {
return
}
input := t.getRequestStrategyInput()
requestHeap := desiredPeerRequests{
- peer: p,
+ peer: &p.Peer,
pieceStates: t.requestPieceStates,
requestIndexes: t.requestIndexes,
}
context.Background(),
pprof.Labels("update request", string(p.needRequestUpdate)),
func(_ context.Context) {
- next := p.getDesiredRequestState()
- p.applyRequestState(next)
- p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
+ p.updateRequests()
},
)
+ p.needRequestUpdate = ""
+ p.lastRequestUpdate = time.Now()
+ if enableUpdateRequestsTimer {
+ p.updateRequestsTimer.Reset(updateRequestsTimerDuration)
+ }
+}
+
+func (p *PeerConn) updateRequests() {
+ next := p.getDesiredRequestState()
+ p.applyRequestState(next)
+ p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
}
func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) {
return roaring.AndNot(p.peerPieces(), &p.t._completedPieces).IsEmpty()
}
-// Transmit/action the request state to the peer.
-func (p *Peer) applyRequestState(next desiredRequestState) {
+// Transmit/action the request state to the peer. This includes work-stealing from other peers and
+// some piece order randomization within the preferred state calculated earlier in next.
+func (p *PeerConn) applyRequestState(next desiredRequestState) {
current := &p.requestState
// Make interest sticky
if !next.Interested && p.requestState.Interested {
}
existing := t.requestingPeer(req)
- if existing != nil && existing != p {
+ if existing != nil && existing != &p.Peer {
// don't steal on cancel - because this is triggered by t.cancelRequest below
// which means that the cancelled can immediately try to steal back a request
// it has lost which can lead to circular cancel/add processing
// "requests %v->%v (peak %v->%v) reason %q (peer %v)",
// originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p)
p.peakRequests = newPeakRequests
- p.needRequestUpdate = ""
- p.lastRequestUpdate = time.Now()
- if enableUpdateRequestsTimer {
- p.updateRequestsTimer.Reset(updateRequestsTimerDuration)
- }
}
// This could be set to 10s to match the unchoke/request update interval recommended by some
t.tryCreateMorePieceHashers()
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
- p.updateRequests("onSetInfo")
+ p.onNeedUpdateRequests("onSetInfo")
})
}
if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
return
}
- c.updateRequests(reason)
+ c.onNeedUpdateRequests(reason)
})
}
// }
t.iterPeers(func(conn *Peer) {
if conn.peerHasPiece(piece) {
- conn.updateRequests("piece incomplete")
+ conn.onNeedUpdateRequests("piece incomplete")
}
})
}
t.dataDownloadDisallowed.Set()
t.iterPeers(func(p *Peer) {
// Could check if peer request state is empty/not interested?
- p.updateRequests("disallow data download")
+ p.onNeedUpdateRequests("disallow data download")
p.cancelAllRequests()
})
}
defer t.cl.unlock()
t.dataDownloadDisallowed.Clear()
t.iterPeers(func(p *Peer) {
- p.updateRequests("allow data download")
+ p.onNeedUpdateRequests("allow data download")
})
}
defer t.cl.unlock()
t.dataUploadDisallowed = false
t.iterPeers(func(p *Peer) {
- p.updateRequests("allow data upload")
+ p.onNeedUpdateRequests("allow data upload")
})
}
t.dataUploadDisallowed = true
for c := range t.conns {
// TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
- c.updateRequests("disallow data upload")
+ c.onNeedUpdateRequests("disallow data upload")
}
}
ws.onGotInfo(t.info)
}
t.webSeeds[url] = &ws.peer
- ws.peer.updateRequests("Torrent.addWebSeed")
+ ws.peer.onNeedUpdateRequests("Torrent.addWebSeed")
return true
}
lastUnhandledErr time.Time
}
+func (me *webseedPeer) updateRequests() {
+ //TODO implement me
+ panic("implement me")
+}
+
func (me *webseedPeer) lastWriteUploadRate() float64 {
// We never upload to webseeds.
return 0
// Delete this entry after waiting above on an error, to prevent more requests.
delete(ws.activeRequests, r)
if err != nil {
- ws.peer.updateRequests("webseedPeer request errored")
+ ws.peer.onNeedUpdateRequests("webseedPeer request errored")
}
ws.spawnRequests()
locker.Unlock()
cn.peer.close()
}
-func (ws *webseedPeer) handleUpdateRequests() {
+func (ws *webseedPeer) handleOnNeedUpdateRequests() {
// Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
// pieces.
go func() {
ws.peer.cancelAllRequests()
ws.peer.t.iterPeers(func(p *Peer) {
if p.isLowOnRequests() {
- p.updateRequests("webseedPeer.onClose")
+ p.onNeedUpdateRequests("webseedPeer.onClose")
}
})
}