From 16c571b58b9cccd34fdbe50b723945e22d2767b8 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 8 Oct 2021 13:53:36 +1100 Subject: [PATCH] Add pprof labels for request updates and remove Client-wide requester --- client.go | 4 --- peer-impl.go | 2 +- peerconn.go | 43 ++++++++++++++++---------------- requesting.go | 65 ++++++++++++++++--------------------------------- torrent.go | 17 +++++++------ webseed-peer.go | 2 +- 6 files changed, 54 insertions(+), 79 deletions(-) diff --git a/client.go b/client.go index c55eb1a7..db93e569 100644 --- a/client.go +++ b/client.go @@ -306,10 +306,6 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { }, } - if !peerRequesting { - go cl.requester() - } - return } diff --git a/peer-impl.go b/peer-impl.go index 23c0fbb9..1dc154f7 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -9,7 +9,7 @@ import ( // legacy PeerConn methods. type peerImpl interface { onNextRequestStateChanged() - updateRequests() + updateRequests(reason string) writeInterested(interested bool) bool // Neither of these return buffer room anymore, because they're currently both posted. There's diff --git a/peerconn.go b/peerconn.go index c1c02a82..dc4ce8d1 100644 --- a/peerconn.go +++ b/peerconn.go @@ -82,7 +82,7 @@ type Peer struct { lastChunkSent time.Time // Stuff controlled by the local peer. - nextRequestState requestState + needRequestUpdate string actualRequestState requestState lastBecameInterested time.Time priorInterest time.Duration @@ -274,7 +274,7 @@ func (cn *PeerConn) onGotInfo(info *metainfo.Info) { // receiving badly sized BITFIELD, or invalid HAVE messages. func (cn *PeerConn) setNumPieces(num pieceIndex) { cn._peerPieces.RemoveRange(bitmap.BitRange(num), bitmap.ToEnd) - cn.peerPiecesChanged() + cn.peerPiecesChanged("got info") } func eventAgeString(t time.Time) string { @@ -654,12 +654,12 @@ func (cn *PeerConn) postBitfield() { cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()} } -func (cn *PeerConn) updateRequests() { - if peerRequesting { - cn.tickleWriter() +func (cn *PeerConn) updateRequests(reason string) { + if cn.needRequestUpdate != "" { return } - cn.t.cl.tickleRequester() + cn.needRequestUpdate = reason + cn.tickleWriter() } // Emits the indices in the Bitmaps bms in order, never repeating any index. @@ -685,8 +685,8 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { } } -func (cn *Peer) peerPiecesChanged() { - cn.updateRequests() +func (cn *Peer) peerPiecesChanged(reason string) { + cn.updateRequests(reason) cn.t.maybeDropMutuallyCompletePeer(cn) } @@ -708,7 +708,7 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error { cn.t.incPieceAvailability(piece) } cn._peerPieces.Add(uint32(piece)) - cn.peerPiecesChanged() + cn.peerPiecesChanged("have") return nil } @@ -741,7 +741,7 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { cn._peerPieces.Remove(uint32(i)) } } - cn.peerPiecesChanged() + cn.peerPiecesChanged("bitfield") return nil } @@ -757,7 +757,7 @@ func (cn *Peer) onPeerHasAllPieces() { } cn.peerSentHaveAll = true cn._peerPieces.Clear() - cn.peerPiecesChanged() + cn.peerPiecesChanged("have all") } func (cn *PeerConn) onPeerSentHaveAll() error { @@ -769,7 +769,7 @@ func (cn *PeerConn) peerSentHaveNone() error { cn.t.decPeerPieceAvailability(&cn.Peer) cn._peerPieces.Clear() cn.peerSentHaveAll = false - cn.peerPiecesChanged() + cn.peerPiecesChanged("have none") return nil } @@ -1029,11 +1029,11 @@ func (c *PeerConn) mainReadLoop() (err error) { c.deleteAllRequests() } // We can then reset our interest. - c.updateRequests() + c.updateRequests("choked") c.updateExpectingChunks() case pp.Unchoke: c.peerChoking = false - c.updateRequests() + c.updateRequests("unchoked") c.updateExpectingChunks() case pp.Interested: c.peerInterested = true @@ -1080,7 +1080,7 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Suggest: torrent.Add("suggests received", 1) log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).SetLevel(log.Debug).Log(c.t.logger) - c.updateRequests() + c.updateRequests("suggested") case pp.HaveAll: err = c.onPeerSentHaveAll() case pp.HaveNone: @@ -1091,7 +1091,7 @@ func (c *PeerConn) mainReadLoop() (err error) { torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) c.peerAllowedFast.Add(bitmap.BitIndex(msg.Index)) - c.updateRequests() + c.updateRequests("allowed fast") case pp.Extended: err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) default: @@ -1262,8 +1262,8 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) if deletedRequest { c.piecesReceivedSinceLastRequestUpdate++ - if c.nextRequestState.Requests.GetCardinality() == 0 { - c.updateRequests() + if c.actualRequestState.Requests.GetCardinality() == 0 { + c.updateRequests("piece") } c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } @@ -1305,7 +1305,10 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { if err != nil { c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err) t.pendRequest(req) - //t.updatePieceCompletion(pieceIndex(msg.Index)) + // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a + // request update runs while we're writing the chunk that just failed. Then we never do a + // fresh update after pending the failed request. + c.updateRequests("write chunk error") t.onWriteChunkErr(err) return nil } @@ -1421,7 +1424,6 @@ func (c *Peer) peerHasWantedPieces() bool { } func (c *Peer) deleteRequest(r RequestIndex) bool { - c.nextRequestState.Requests.Remove(r) if !c.actualRequestState.Requests.CheckedRemove(r) { return false } @@ -1449,7 +1451,6 @@ func (c *Peer) deleteAllRequests() { if !c.actualRequestState.Requests.IsEmpty() { panic(c.actualRequestState.Requests.GetCardinality()) } - c.nextRequestState.Requests.Clear() // for c := range c.t.conns { // c.tickleWriter() // } diff --git a/requesting.go b/requesting.go index c6dd6415..f4813516 100644 --- a/requesting.go +++ b/requesting.go @@ -2,13 +2,14 @@ package torrent import ( "container/heap" + "context" "encoding/gob" "reflect" + "runtime/pprof" "time" "unsafe" "github.com/RoaringBitmap/roaring" - "github.com/anacrolix/chansync/events" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" @@ -16,34 +17,6 @@ import ( request_strategy "github.com/anacrolix/torrent/request-strategy" ) -// Calculate requests individually for each peer. -const peerRequesting = true - -func (cl *Client) requester() { - for { - update := func() events.Signaled { - cl.lock() - defer cl.unlock() - cl.doRequests() - return cl.updateRequests.Signaled() - }() - minWait := time.After(100 * time.Millisecond) - maxWait := time.After(1000 * time.Millisecond) - select { - case <-cl.closed.Done(): - return - case <-minWait: - case <-maxWait: - } - select { - case <-cl.closed.Done(): - return - case <-update: - case <-maxWait: - } - } -} - func (cl *Client) tickleRequester() { cl.updateRequests.Broadcast() } @@ -107,14 +80,6 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input { } } -func (cl *Client) doRequests() { - input := cl.getRequestStrategyInput() - nextPeerStates := request_strategy.Run(input) - for p, state := range nextPeerStates { - setPeerNextRequestState(p, state) - } -} - func init() { gob.Register(peerId{}) } @@ -153,12 +118,6 @@ func (p *peerId) GobDecode(b []byte) error { return nil } -func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) { - p := _p.(peerId).Peer - p.nextRequestState = rp - p.onNextRequestStateChanged() -} - type RequestIndex = request_strategy.RequestIndex type chunkIndexType = request_strategy.ChunkIndex @@ -262,7 +221,22 @@ func (p *Peer) getDesiredRequestState() (desired requestState) { } func (p *Peer) applyNextRequestState() bool { - next := p.getDesiredRequestState() + if p.needRequestUpdate == "" { + return true + } + var more bool + pprof.Do( + context.Background(), + pprof.Labels("update request", p.needRequestUpdate), + func(_ context.Context) { + next := p.getDesiredRequestState() + more = p.applyRequestState(next) + }, + ) + return more +} + +func (p *Peer) applyRequestState(next requestState) bool { current := p.actualRequestState if !p.setInterested(next.Interested) { return false @@ -291,5 +265,8 @@ func (p *Peer) applyNextRequestState() bool { } */ return more }) + if more { + p.needRequestUpdate = "" + } return more } diff --git a/torrent.go b/torrent.go index b99e4d18..a89a114e 100644 --- a/torrent.go +++ b/torrent.go @@ -428,9 +428,6 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { // This seems to be all the follow-up tasks after info is set, that can't fail. func (t *Torrent) onSetInfo() { - t.iterPeers(func(p *Peer) { - p.onGotInfo(t.info) - }) for i := range t.pieces { p := &t.pieces[i] // Need to add availability before updating piece completion, as that may result in conns @@ -450,6 +447,10 @@ func (t *Torrent) onSetInfo() { t.updateWantPeersEvent() t.pendingRequests = make(map[RequestIndex]int) t.tryCreateMorePieceHashers() + t.iterPeers(func(p *Peer) { + p.onGotInfo(t.info) + p.updateRequests("onSetInfo") + }) } // Called when metadata for a torrent becomes available. @@ -1097,9 +1098,9 @@ func (t *Torrent) maybeNewConns() { } func (t *Torrent) piecePriorityChanged(piece pieceIndex) { - if true || t._pendingPieces.Contains(piece) { + if t._pendingPieces.Contains(piece) { t.iterPeers(func(c *Peer) { - c.updateRequests() + c.updateRequests("piece priority changed") }) } t.maybeNewConns() @@ -2007,7 +2008,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { // } t.iterPeers(func(conn *Peer) { if conn.peerHasPiece(piece) { - conn.updateRequests() + conn.updateRequests("piece incomplete") } }) } @@ -2183,7 +2184,7 @@ func (t *Torrent) AllowDataUpload() { defer t.cl.unlock() t.dataUploadDisallowed = false for c := range t.conns { - c.updateRequests() + c.updateRequests("allow data upload") } } @@ -2193,7 +2194,7 @@ func (t *Torrent) DisallowDataUpload() { defer t.cl.unlock() t.dataUploadDisallowed = true for c := range t.conns { - c.updateRequests() + c.updateRequests("disallow data upload") } } diff --git a/webseed-peer.go b/webseed-peer.go index 9263aa46..ebdd8dba 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -103,7 +103,7 @@ func (ws *webseedPeer) connectionFlags() string { // return bool if this is even possible, and if it isn't, skip to the next drop candidate. func (ws *webseedPeer) drop() {} -func (ws *webseedPeer) updateRequests() { +func (ws *webseedPeer) updateRequests(reason string) { } func (ws *webseedPeer) onClose() { -- 2.44.0