},
}
- if !peerRequesting {
- go cl.requester()
- }
-
return
}
// legacy PeerConn methods.
type peerImpl interface {
onNextRequestStateChanged()
- updateRequests()
+ updateRequests(reason string)
writeInterested(interested bool) bool
// Neither of these return buffer room anymore, because they're currently both posted. There's
lastChunkSent time.Time
// Stuff controlled by the local peer.
- nextRequestState requestState
+ needRequestUpdate string
actualRequestState requestState
lastBecameInterested time.Time
priorInterest time.Duration
// receiving badly sized BITFIELD, or invalid HAVE messages.
func (cn *PeerConn) setNumPieces(num pieceIndex) {
cn._peerPieces.RemoveRange(bitmap.BitRange(num), bitmap.ToEnd)
- cn.peerPiecesChanged()
+ cn.peerPiecesChanged("got info")
}
func eventAgeString(t time.Time) string {
cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()}
}
-func (cn *PeerConn) updateRequests() {
- if peerRequesting {
- cn.tickleWriter()
+func (cn *PeerConn) updateRequests(reason string) {
+ if cn.needRequestUpdate != "" {
return
}
- cn.t.cl.tickleRequester()
+ cn.needRequestUpdate = reason
+ cn.tickleWriter()
}
// Emits the indices in the Bitmaps bms in order, never repeating any index.
}
}
-func (cn *Peer) peerPiecesChanged() {
- cn.updateRequests()
+func (cn *Peer) peerPiecesChanged(reason string) {
+ cn.updateRequests(reason)
cn.t.maybeDropMutuallyCompletePeer(cn)
}
cn.t.incPieceAvailability(piece)
}
cn._peerPieces.Add(uint32(piece))
- cn.peerPiecesChanged()
+ cn.peerPiecesChanged("have")
return nil
}
cn._peerPieces.Remove(uint32(i))
}
}
- cn.peerPiecesChanged()
+ cn.peerPiecesChanged("bitfield")
return nil
}
}
cn.peerSentHaveAll = true
cn._peerPieces.Clear()
- cn.peerPiecesChanged()
+ cn.peerPiecesChanged("have all")
}
func (cn *PeerConn) onPeerSentHaveAll() error {
cn.t.decPeerPieceAvailability(&cn.Peer)
cn._peerPieces.Clear()
cn.peerSentHaveAll = false
- cn.peerPiecesChanged()
+ cn.peerPiecesChanged("have none")
return nil
}
c.deleteAllRequests()
}
// We can then reset our interest.
- c.updateRequests()
+ c.updateRequests("choked")
c.updateExpectingChunks()
case pp.Unchoke:
c.peerChoking = false
- c.updateRequests()
+ c.updateRequests("unchoked")
c.updateExpectingChunks()
case pp.Interested:
c.peerInterested = true
case pp.Suggest:
torrent.Add("suggests received", 1)
log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).SetLevel(log.Debug).Log(c.t.logger)
- c.updateRequests()
+ c.updateRequests("suggested")
case pp.HaveAll:
err = c.onPeerSentHaveAll()
case pp.HaveNone:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
c.peerAllowedFast.Add(bitmap.BitIndex(msg.Index))
- c.updateRequests()
+ c.updateRequests("allowed fast")
case pp.Extended:
err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
default:
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
if deletedRequest {
c.piecesReceivedSinceLastRequestUpdate++
- if c.nextRequestState.Requests.GetCardinality() == 0 {
- c.updateRequests()
+ if c.actualRequestState.Requests.GetCardinality() == 0 {
+ c.updateRequests("piece")
}
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
}
if err != nil {
c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err)
t.pendRequest(req)
- //t.updatePieceCompletion(pieceIndex(msg.Index))
+ // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a
+ // request update runs while we're writing the chunk that just failed. Then we never do a
+ // fresh update after pending the failed request.
+ c.updateRequests("write chunk error")
t.onWriteChunkErr(err)
return nil
}
}
func (c *Peer) deleteRequest(r RequestIndex) bool {
- c.nextRequestState.Requests.Remove(r)
if !c.actualRequestState.Requests.CheckedRemove(r) {
return false
}
if !c.actualRequestState.Requests.IsEmpty() {
panic(c.actualRequestState.Requests.GetCardinality())
}
- c.nextRequestState.Requests.Clear()
// for c := range c.t.conns {
// c.tickleWriter()
// }
import (
"container/heap"
+ "context"
"encoding/gob"
"reflect"
+ "runtime/pprof"
"time"
"unsafe"
"github.com/RoaringBitmap/roaring"
- "github.com/anacrolix/chansync/events"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
request_strategy "github.com/anacrolix/torrent/request-strategy"
)
-// Calculate requests individually for each peer.
-const peerRequesting = true
-
-func (cl *Client) requester() {
- for {
- update := func() events.Signaled {
- cl.lock()
- defer cl.unlock()
- cl.doRequests()
- return cl.updateRequests.Signaled()
- }()
- minWait := time.After(100 * time.Millisecond)
- maxWait := time.After(1000 * time.Millisecond)
- select {
- case <-cl.closed.Done():
- return
- case <-minWait:
- case <-maxWait:
- }
- select {
- case <-cl.closed.Done():
- return
- case <-update:
- case <-maxWait:
- }
- }
-}
-
func (cl *Client) tickleRequester() {
cl.updateRequests.Broadcast()
}
}
}
-func (cl *Client) doRequests() {
- input := cl.getRequestStrategyInput()
- nextPeerStates := request_strategy.Run(input)
- for p, state := range nextPeerStates {
- setPeerNextRequestState(p, state)
- }
-}
-
func init() {
gob.Register(peerId{})
}
return nil
}
-func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
- p := _p.(peerId).Peer
- p.nextRequestState = rp
- p.onNextRequestStateChanged()
-}
-
type RequestIndex = request_strategy.RequestIndex
type chunkIndexType = request_strategy.ChunkIndex
}
func (p *Peer) applyNextRequestState() bool {
- next := p.getDesiredRequestState()
+ if p.needRequestUpdate == "" {
+ return true
+ }
+ var more bool
+ pprof.Do(
+ context.Background(),
+ pprof.Labels("update request", p.needRequestUpdate),
+ func(_ context.Context) {
+ next := p.getDesiredRequestState()
+ more = p.applyRequestState(next)
+ },
+ )
+ return more
+}
+
+func (p *Peer) applyRequestState(next requestState) bool {
current := p.actualRequestState
if !p.setInterested(next.Interested) {
return false
} */
return more
})
+ if more {
+ p.needRequestUpdate = ""
+ }
return more
}
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
- t.iterPeers(func(p *Peer) {
- p.onGotInfo(t.info)
- })
for i := range t.pieces {
p := &t.pieces[i]
// Need to add availability before updating piece completion, as that may result in conns
t.updateWantPeersEvent()
t.pendingRequests = make(map[RequestIndex]int)
t.tryCreateMorePieceHashers()
+ t.iterPeers(func(p *Peer) {
+ p.onGotInfo(t.info)
+ p.updateRequests("onSetInfo")
+ })
}
// Called when metadata for a torrent becomes available.
}
func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
- if true || t._pendingPieces.Contains(piece) {
+ if t._pendingPieces.Contains(piece) {
t.iterPeers(func(c *Peer) {
- c.updateRequests()
+ c.updateRequests("piece priority changed")
})
}
t.maybeNewConns()
// }
t.iterPeers(func(conn *Peer) {
if conn.peerHasPiece(piece) {
- conn.updateRequests()
+ conn.updateRequests("piece incomplete")
}
})
}
defer t.cl.unlock()
t.dataUploadDisallowed = false
for c := range t.conns {
- c.updateRequests()
+ c.updateRequests("allow data upload")
}
}
defer t.cl.unlock()
t.dataUploadDisallowed = true
for c := range t.conns {
- c.updateRequests()
+ c.updateRequests("disallow data upload")
}
}
// return bool if this is even possible, and if it isn't, skip to the next drop candidate.
func (ws *webseedPeer) drop() {}
-func (ws *webseedPeer) updateRequests() {
+func (ws *webseedPeer) updateRequests(reason string) {
}
func (ws *webseedPeer) onClose() {