cumulativeExpectedToReceiveChunks time.Duration
choking bool
- // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
- // and implementation differences, we may receive chunks that are no longer in the set of
- // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
- validReceiveChunks map[RequestIndex]int
- // Indexed by metadata piece, set to true if posted and pending a
- // response.
- metadataRequests []bool
- sentHaves bitmap.Bitmap
// Stuff controlled by the remote peer.
peerInterested bool
return nil
}
-func (cn *Peer) mustRequest(r RequestIndex) bool {
+func (cn *PeerConn) mustRequest(r RequestIndex) bool {
more, err := cn.request(r)
if err != nil {
panic(err)
return more
}
-func (cn *Peer) request(r RequestIndex) (more bool, err error) {
+func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
}
cn.validReceiveChunks[r]++
cn.t.requestState[r] = requestState{
- peer: cn,
+ peer: cn.peerPtr(),
when: time.Now(),
}
cn.updateExpectingChunks()
ppReq := cn.t.requestIndexToRequest(r)
for _, f := range cn.callbacks.SentRequest {
- f(PeerRequestEvent{cn, ppReq})
+ f(PeerRequestEvent{cn.peerPtr(), ppReq})
}
return cn.legacyPeerImpl._request(ppReq), nil
}
}
// Returns true if it was valid to reject the request.
-func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
+func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool {
if c.deleteRequest(r) {
c.decPeakRequests()
} else if !c.requestState.Cancelled.CheckedRemove(r) {
return true
}
-func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
+func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) {
count := c.validReceiveChunks[r]
if count == 1 {
delete(c.validReceiveChunks, r)
c.recordBlockForSmartBan(req, msg.Piece)
})
// This needs to occur before we return, but we try to do it when the client is unlocked. It
- // can't be done before checking if chunks are valid because they won't be deallocated by piece
- // hashing if they're out of bounds.
+ // can't be done before checking if chunks are valid because they won't be deallocated from the
+ // smart ban cache by piece hashing if they're out of bounds.
defer recordBlockForSmartBan()
if c.peerChoking {
ChunksReceived.Add("while choked", 1)
}
- if c.validReceiveChunks[req] <= 0 {
- ChunksReceived.Add("unexpected", 1)
- return errors.New("received unexpected chunk")
+ err = c.peerImpl.checkReceivedChunk(req)
+ if err != nil {
+ return err
}
- c.decExpectedChunkReceive(req)
if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) {
ChunksReceived.Add("due to allowed fast", 1)
type PeerConn struct {
Peer
+ // Indexed by metadata piece, set to true if posted and pending a response.
+ metadataRequests []bool
+ sentHaves bitmap.Bitmap
+ // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
+ // and implementation differences, we may receive chunks that are no longer in the set of
+ // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
+ validReceiveChunks map[RequestIndex]int
+
// Move to PeerConn?
protocolLogger log.Logger
me.Peer.slogger = s.With(fmt.Sprintf("%T", me), fmt.Sprintf("%p", me))
me.protocolLogger = me.logger.WithNames(protocolLoggingName)
}
+
+func (c *PeerConn) checkReceivedChunk(req RequestIndex) error {
+ if c.validReceiveChunks[req] <= 0 {
+ ChunksReceived.Add("unexpected", 1)
+ return errors.New("received unexpected chunk")
+ }
+ c.decExpectedChunkReceive(req)
+ return nil
+}
// Transmit/action the request state to the peer. This includes work-stealing from other peers and
// some piece order randomization within the preferred state calculated earlier in next. Cancels are
// not done here, those are handled synchronously. We only track pending cancel acknowledgements.
-func (p *Peer) applyRequestState(next desiredRequestState) {
+func (p *PeerConn) applyRequestState(next desiredRequestState) {
current := &p.requestState
// Make interest sticky
if !next.Interested && p.requestState.Interested {
}
existing := t.requestingPeer(req)
- if existing != nil && existing != p {
+ if existing != nil && existing != p.peerPtr() {
// don't steal on cancel - because this is triggered by t.cancelRequest below
// which means that the cancelled can immediately try to steal back a request
// it has lost which can lead to circular cancel/add processing
hostKey webseedHostKeyHandle
}
+func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error {
+ return nil
+}
+
func (me *webseedPeer) nominalMaxRequests() maxRequests {
// TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
// we just allow 2 MiB worth of requests.
func (me *webseedPeer) updateRequests() {
return
- if !me.shouldUpdateRequests() {
- return
- }
- p := &me.peer
- next := p.getDesiredRequestState()
- p.applyRequestState(next)
- p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
- // Run this after all requests applied to the peer, so they can be batched up.
- me.spawnRequests()
}
func (me *webseedPeer) lastWriteUploadRate() float64 {