]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add pprof labels for request updates and remove Client-wide requester
authorMatt Joiner <anacrolix@gmail.com>
Fri, 8 Oct 2021 02:53:36 +0000 (13:53 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 19 Oct 2021 03:08:13 +0000 (14:08 +1100)
client.go
peer-impl.go
peerconn.go
requesting.go
torrent.go
webseed-peer.go

index c55eb1a7a0e3342fecf0e6f5c4278fa1a4353d11..db93e5695c80f12de7356935e3758c7101e5261d 100644 (file)
--- a/client.go
+++ b/client.go
@@ -306,10 +306,6 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                },
        }
 
-       if !peerRequesting {
-               go cl.requester()
-       }
-
        return
 }
 
index 23c0fbb922413298e5ac548a245d01df26bcbb58..1dc154f705632eaa77da418aef825b962b48647a 100644 (file)
@@ -9,7 +9,7 @@ import (
 // legacy PeerConn methods.
 type peerImpl interface {
        onNextRequestStateChanged()
-       updateRequests()
+       updateRequests(reason string)
        writeInterested(interested bool) bool
 
        // Neither of these return buffer room anymore, because they're currently both posted. There's
index c1c02a8289237347ff58e53ea1044d90c0302799..dc4ce8d13714d4add7bb2461d8fa11a3e93d39ea 100644 (file)
@@ -82,7 +82,7 @@ type Peer struct {
        lastChunkSent           time.Time
 
        // Stuff controlled by the local peer.
-       nextRequestState     requestState
+       needRequestUpdate    string
        actualRequestState   requestState
        lastBecameInterested time.Time
        priorInterest        time.Duration
@@ -274,7 +274,7 @@ func (cn *PeerConn) onGotInfo(info *metainfo.Info) {
 // receiving badly sized BITFIELD, or invalid HAVE messages.
 func (cn *PeerConn) setNumPieces(num pieceIndex) {
        cn._peerPieces.RemoveRange(bitmap.BitRange(num), bitmap.ToEnd)
-       cn.peerPiecesChanged()
+       cn.peerPiecesChanged("got info")
 }
 
 func eventAgeString(t time.Time) string {
@@ -654,12 +654,12 @@ func (cn *PeerConn) postBitfield() {
        cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()}
 }
 
-func (cn *PeerConn) updateRequests() {
-       if peerRequesting {
-               cn.tickleWriter()
+func (cn *PeerConn) updateRequests(reason string) {
+       if cn.needRequestUpdate != "" {
                return
        }
-       cn.t.cl.tickleRequester()
+       cn.needRequestUpdate = reason
+       cn.tickleWriter()
 }
 
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
@@ -685,8 +685,8 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
        }
 }
 
-func (cn *Peer) peerPiecesChanged() {
-       cn.updateRequests()
+func (cn *Peer) peerPiecesChanged(reason string) {
+       cn.updateRequests(reason)
        cn.t.maybeDropMutuallyCompletePeer(cn)
 }
 
@@ -708,7 +708,7 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error {
                cn.t.incPieceAvailability(piece)
        }
        cn._peerPieces.Add(uint32(piece))
-       cn.peerPiecesChanged()
+       cn.peerPiecesChanged("have")
        return nil
 }
 
@@ -741,7 +741,7 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error {
                        cn._peerPieces.Remove(uint32(i))
                }
        }
-       cn.peerPiecesChanged()
+       cn.peerPiecesChanged("bitfield")
        return nil
 }
 
@@ -757,7 +757,7 @@ func (cn *Peer) onPeerHasAllPieces() {
        }
        cn.peerSentHaveAll = true
        cn._peerPieces.Clear()
-       cn.peerPiecesChanged()
+       cn.peerPiecesChanged("have all")
 }
 
 func (cn *PeerConn) onPeerSentHaveAll() error {
@@ -769,7 +769,7 @@ func (cn *PeerConn) peerSentHaveNone() error {
        cn.t.decPeerPieceAvailability(&cn.Peer)
        cn._peerPieces.Clear()
        cn.peerSentHaveAll = false
-       cn.peerPiecesChanged()
+       cn.peerPiecesChanged("have none")
        return nil
 }
 
@@ -1029,11 +1029,11 @@ func (c *PeerConn) mainReadLoop() (err error) {
                                c.deleteAllRequests()
                        }
                        // We can then reset our interest.
-                       c.updateRequests()
+                       c.updateRequests("choked")
                        c.updateExpectingChunks()
                case pp.Unchoke:
                        c.peerChoking = false
-                       c.updateRequests()
+                       c.updateRequests("unchoked")
                        c.updateExpectingChunks()
                case pp.Interested:
                        c.peerInterested = true
@@ -1080,7 +1080,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.Suggest:
                        torrent.Add("suggests received", 1)
                        log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).SetLevel(log.Debug).Log(c.t.logger)
-                       c.updateRequests()
+                       c.updateRequests("suggested")
                case pp.HaveAll:
                        err = c.onPeerSentHaveAll()
                case pp.HaveNone:
@@ -1091,7 +1091,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
                        c.peerAllowedFast.Add(bitmap.BitIndex(msg.Index))
-                       c.updateRequests()
+                       c.updateRequests("allowed fast")
                case pp.Extended:
                        err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
                default:
@@ -1262,8 +1262,8 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
        if deletedRequest {
                c.piecesReceivedSinceLastRequestUpdate++
-               if c.nextRequestState.Requests.GetCardinality() == 0 {
-                       c.updateRequests()
+               if c.actualRequestState.Requests.GetCardinality() == 0 {
+                       c.updateRequests("piece")
                }
                c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
        }
@@ -1305,7 +1305,10 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        if err != nil {
                c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err)
                t.pendRequest(req)
-               //t.updatePieceCompletion(pieceIndex(msg.Index))
+               // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a
+               // request update runs while we're writing the chunk that just failed. Then we never do a
+               // fresh update after pending the failed request.
+               c.updateRequests("write chunk error")
                t.onWriteChunkErr(err)
                return nil
        }
@@ -1421,7 +1424,6 @@ func (c *Peer) peerHasWantedPieces() bool {
 }
 
 func (c *Peer) deleteRequest(r RequestIndex) bool {
-       c.nextRequestState.Requests.Remove(r)
        if !c.actualRequestState.Requests.CheckedRemove(r) {
                return false
        }
@@ -1449,7 +1451,6 @@ func (c *Peer) deleteAllRequests() {
        if !c.actualRequestState.Requests.IsEmpty() {
                panic(c.actualRequestState.Requests.GetCardinality())
        }
-       c.nextRequestState.Requests.Clear()
        // for c := range c.t.conns {
        //      c.tickleWriter()
        // }
index c6dd6415de1a91b235e34faf1c20cfaa72333ef7..f481351673a1daedf8c8830fe61c08bdf7640016 100644 (file)
@@ -2,13 +2,14 @@ package torrent
 
 import (
        "container/heap"
+       "context"
        "encoding/gob"
        "reflect"
+       "runtime/pprof"
        "time"
        "unsafe"
 
        "github.com/RoaringBitmap/roaring"
-       "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
@@ -16,34 +17,6 @@ import (
        request_strategy "github.com/anacrolix/torrent/request-strategy"
 )
 
-// Calculate requests individually for each peer.
-const peerRequesting = true
-
-func (cl *Client) requester() {
-       for {
-               update := func() events.Signaled {
-                       cl.lock()
-                       defer cl.unlock()
-                       cl.doRequests()
-                       return cl.updateRequests.Signaled()
-               }()
-               minWait := time.After(100 * time.Millisecond)
-               maxWait := time.After(1000 * time.Millisecond)
-               select {
-               case <-cl.closed.Done():
-                       return
-               case <-minWait:
-               case <-maxWait:
-               }
-               select {
-               case <-cl.closed.Done():
-                       return
-               case <-update:
-               case <-maxWait:
-               }
-       }
-}
-
 func (cl *Client) tickleRequester() {
        cl.updateRequests.Broadcast()
 }
@@ -107,14 +80,6 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
        }
 }
 
-func (cl *Client) doRequests() {
-       input := cl.getRequestStrategyInput()
-       nextPeerStates := request_strategy.Run(input)
-       for p, state := range nextPeerStates {
-               setPeerNextRequestState(p, state)
-       }
-}
-
 func init() {
        gob.Register(peerId{})
 }
@@ -153,12 +118,6 @@ func (p *peerId) GobDecode(b []byte) error {
        return nil
 }
 
-func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
-       p := _p.(peerId).Peer
-       p.nextRequestState = rp
-       p.onNextRequestStateChanged()
-}
-
 type RequestIndex = request_strategy.RequestIndex
 type chunkIndexType = request_strategy.ChunkIndex
 
@@ -262,7 +221,22 @@ func (p *Peer) getDesiredRequestState() (desired requestState) {
 }
 
 func (p *Peer) applyNextRequestState() bool {
-       next := p.getDesiredRequestState()
+       if p.needRequestUpdate == "" {
+               return true
+       }
+       var more bool
+       pprof.Do(
+               context.Background(),
+               pprof.Labels("update request", p.needRequestUpdate),
+               func(_ context.Context) {
+                       next := p.getDesiredRequestState()
+                       more = p.applyRequestState(next)
+               },
+       )
+       return more
+}
+
+func (p *Peer) applyRequestState(next requestState) bool {
        current := p.actualRequestState
        if !p.setInterested(next.Interested) {
                return false
@@ -291,5 +265,8 @@ func (p *Peer) applyNextRequestState() bool {
                } */
                return more
        })
+       if more {
+               p.needRequestUpdate = ""
+       }
        return more
 }
index b99e4d18afe080522ee98bd7629cf755a9239934..a89a114e17dba6c5376bc4f54e810e5e23c073ed 100644 (file)
@@ -428,9 +428,6 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
 
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
-       t.iterPeers(func(p *Peer) {
-               p.onGotInfo(t.info)
-       })
        for i := range t.pieces {
                p := &t.pieces[i]
                // Need to add availability before updating piece completion, as that may result in conns
@@ -450,6 +447,10 @@ func (t *Torrent) onSetInfo() {
        t.updateWantPeersEvent()
        t.pendingRequests = make(map[RequestIndex]int)
        t.tryCreateMorePieceHashers()
+       t.iterPeers(func(p *Peer) {
+               p.onGotInfo(t.info)
+               p.updateRequests("onSetInfo")
+       })
 }
 
 // Called when metadata for a torrent becomes available.
@@ -1097,9 +1098,9 @@ func (t *Torrent) maybeNewConns() {
 }
 
 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
-       if true || t._pendingPieces.Contains(piece) {
+       if t._pendingPieces.Contains(piece) {
                t.iterPeers(func(c *Peer) {
-                       c.updateRequests()
+                       c.updateRequests("piece priority changed")
                })
        }
        t.maybeNewConns()
@@ -2007,7 +2008,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
        // }
        t.iterPeers(func(conn *Peer) {
                if conn.peerHasPiece(piece) {
-                       conn.updateRequests()
+                       conn.updateRequests("piece incomplete")
                }
        })
 }
@@ -2183,7 +2184,7 @@ func (t *Torrent) AllowDataUpload() {
        defer t.cl.unlock()
        t.dataUploadDisallowed = false
        for c := range t.conns {
-               c.updateRequests()
+               c.updateRequests("allow data upload")
        }
 }
 
@@ -2193,7 +2194,7 @@ func (t *Torrent) DisallowDataUpload() {
        defer t.cl.unlock()
        t.dataUploadDisallowed = true
        for c := range t.conns {
-               c.updateRequests()
+               c.updateRequests("disallow data upload")
        }
 }
 
index 9263aa46227f7f356717da7e1f5e36d889fff73c..ebdd8dba9ef562a50e2a59b591d596916a9860c8 100644 (file)
@@ -103,7 +103,7 @@ func (ws *webseedPeer) connectionFlags() string {
 // return bool if this is even possible, and if it isn't, skip to the next drop candidate.
 func (ws *webseedPeer) drop() {}
 
-func (ws *webseedPeer) updateRequests() {
+func (ws *webseedPeer) updateRequests(reason string) {
 }
 
 func (ws *webseedPeer) onClose() {