// handleCancel initiates cancellation of a request
handleCancel(ri RequestIndex)
+ cancelAllRequests()
connectionFlags() string
onClose()
onGotInfo(info *metainfo.Info)
lastChunkSent time.Time
// Stuff controlled by the local peer.
- needRequestUpdate updateRequestReason
- // TODO: How are pending cancels handled for webseed peers?
- requestState requestStrategy.PeerRequestState
+ needRequestUpdate updateRequestReason
updateRequestsTimer *time.Timer
lastRequestUpdate time.Time
peakRequests maxRequests
return
}
-func (p *Peer) initRequestState() {
+func (p *PeerConn) initRequestState() {
p.requestState.Requests = &peerRequests{}
}
return haveAllowedFastRequests
}
-func (cn *Peer) cumInterest() time.Duration {
+func (cn *PeerConn) cumInterest() time.Duration {
ret := cn.priorInterest
if cn.requestState.Interested {
ret += time.Since(cn.lastBecameInterested)
}
// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
-func (cn *Peer) statusFlags() (ret string) {
+func (cn *PeerConn) statusFlags() (ret string) {
c := func(b byte) {
ret += string([]byte{b})
}
// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
// when we want to go fast.
-func (cn *Peer) shouldRequest(r RequestIndex) error {
+func (cn *PeerConn) shouldRequest(r RequestIndex) error {
err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
if err != nil {
return err
if !cn.peerHasPiece(pi) {
return errors.New("requesting piece peer doesn't have")
}
- if !cn.t.peerIsActive(cn) {
+ if !cn.t.peerIsActive(cn.peerPtr()) {
panic("requesting but not in active conns")
}
if cn.closed.IsSet() {
}
cn.validReceiveChunks[r]++
cn.t.requestState[r] = requestState{
- peer: cn.peerPtr(),
+ peer: cn,
when: time.Now(),
}
cn.updateExpectingChunks()
return cn._request(ppReq), nil
}
-func (me *Peer) cancel(r RequestIndex) {
+func (me *PeerConn) cancel(r RequestIndex) {
if !me.deleteRequest(r) {
panic("request not existing should have been guarded")
}
// Cancel pending requests for this chunk from *other* peers.
if p := t.requestingPeer(req); p != nil {
- if p == c {
+ if p.peerPtr() == c {
p.logger.Slogger().Error("received chunk but still pending request", "peer", p, "req", req)
panic("should not be pending request from conn that just received it")
}
// Returns true if an outstanding request is removed. Cancelled requests should be handled
// separately.
-func (c *Peer) deleteRequest(r RequestIndex) bool {
+func (c *PeerConn) deleteRequest(r RequestIndex) bool {
if !c.requestState.Requests.CheckedRemove(r) {
return false
}
for _, f := range c.callbacks.DeletedRequest {
- f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
+ f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)})
}
c.updateExpectingChunks()
// TODO: Can't this happen if a request is stolen?
return true
}
-func (c *Peer) deleteAllRequests(reason updateRequestReason) {
+func (c *PeerConn) deleteAllRequests(reason updateRequestReason) {
if c.requestState.Requests.IsEmpty() {
return
}
})
}
-func (c *Peer) assertNoRequests() {
+func (c *PeerConn) assertNoRequests() {
if !c.requestState.Requests.IsEmpty() {
panic(c.requestState.Requests.GetCardinality())
}
}
-func (c *Peer) cancelAllRequests() {
+func (c *PeerConn) cancelAllRequests() {
c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
c.cancel(x)
return true
return pc, ok
}
-func (p *Peer) uncancelledRequests() uint64 {
+func (p *PeerConn) uncancelledRequests() uint64 {
return p.requestState.Requests.GetCardinality()
}
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
+ requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
// we may not even know the number of pieces in the torrent yet.
peerSentHaveAll bool
+ // TODO: How are pending cancels handled for webseed peers?
+ requestState requestStrategy.PeerRequestState
+
peerRequestDataAllocLimiter alloclim.Limiter
outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}
type desiredPeerRequests struct {
requestIndexes []RequestIndex
- peer *Peer
+ peer *PeerConn
pieceStates []g.Option[requestStrategy.PieceRequestOrderState]
}
// This gets the best-case request state. That means handling pieces limited by capacity, preferring
// earlier pieces, low availability etc. It pays no attention to existing requests on the peer or
// other peers. Those are handled later.
-func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
+func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) {
t := p.t
if !t.haveInfo() {
return
}
existing := t.requestingPeer(req)
- if existing != nil && existing != p.peerPtr() {
+ if existing != nil && existing != p {
// don't steal on cancel - because this is triggered by t.cancelRequest below
// which means that the cancelled can immediately try to steal back a request
// it has lost which can lead to circular cancel/add processing
// Non-pending pieces are usually cancelled more synchronously.
return
}
- t.iterPeers(func(c *Peer) {
- if !c.isLowOnRequests() {
- return
- }
- if !c.peerHasPiece(piece) {
- return
- }
- if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
- return
- }
- c.onNeedUpdateRequests(reason)
- })
+ for c := range t.conns {
+ // This is a lot of effort to avoid using continue...
+ func() {
+ if !c.isLowOnRequests() {
+ return
+ }
+ if !c.peerHasPiece(piece) {
+ return
+ }
+ if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
+ return
+ }
+ c.onNeedUpdateRequests(reason)
+ }()
+ }
}
// Stuff we don't want to run when the pending pieces change while benchmarking.
t.complete.SetBool(t.haveAllPieces())
}
-func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
+func (t *Torrent) cancelRequest(r RequestIndex) *PeerConn {
p := t.requestingPeer(r)
if p != nil {
p.cancel(r)
return p
}
-func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
+func (t *Torrent) requestingPeer(r RequestIndex) *PeerConn {
return t.requestState[r].peer
}
}
type requestState struct {
- peer *Peer
+ peer *PeerConn
when time.Time
}
hostKey webseedHostKeyHandle
}
+func (me *webseedPeer) cancelAllRequests() {
+ // Is there any point to this? Won't we fail to receive a chunk and cancel anyway? Should we
+ // Close requests instead?
+ for req := range me.activeRequests {
+ req.Cancel()
+ }
+}
+
func (me *webseedPeer) peerImplWriteStatus(w io.Writer) {}
func (me *webseedPeer) isLowOnRequests() bool {
ws.peer.updateExpectingChunks()
}
-func (ws *webseedPeer) iterConsecutiveRequests(begin RequestIndex) iter.Seq[RequestIndex] {
- return func(yield func(RequestIndex) bool) {
- for {
- if !ws.peer.requestState.Requests.Contains(begin) {
- return
- }
- if !yield(begin) {
- return
- }
- begin++
- }
- }
-}
-
-func (ws *webseedPeer) iterConsecutiveInactiveRequests(begin RequestIndex) iter.Seq[RequestIndex] {
- return func(yield func(RequestIndex) bool) {
- for req := range ws.iterConsecutiveRequests(begin) {
- if !ws.inactiveRequestIndex(req) {
- return
- }
- if !yield(req) {
- return
- }
- }
- }
-}
-
func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool {
for range ws.activeRequestsForIndex(index) {
return false
return true
}
-func (ws *webseedPeer) inactiveRequests() iter.Seq[RequestIndex] {
- return func(yield func(RequestIndex) bool) {
- // This is used to determine contiguity of requests.
- //sorted := slices.Sorted(ws.peer.requestState.Requests.Iterator())
- //if len(sorted) != 0 {
- // fmt.Println("inactiveRequests", sorted)
- //}
- for reqIndex := range ws.peer.requestState.Requests.Iterator() {
- if !ws.inactiveRequestIndex(reqIndex) {
- continue
- }
- if !yield(reqIndex) {
- return
- }
- }
- }
-}
-
func (ws *webseedPeer) connectionFlags() string {
return "WS"
}