From a74bebfc14c7bedb67493465b97642a9a7b91c42 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 22 Jul 2025 23:17:41 +1000 Subject: [PATCH] Fix peer request handling concurrency --- peer.go | 13 +++++++++++++ peerconn.go | 46 ++++++++++++++++++++++++++++++++++++++-------- torrent.go | 1 + 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/peer.go b/peer.go index 6bcddd49..7dcf6503 100644 --- a/peer.go +++ b/peer.go @@ -1,6 +1,7 @@ package torrent import ( + "context" "fmt" "io" "log/slog" @@ -15,6 +16,7 @@ import ( "github.com/anacrolix/log" "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/internal/alloclim" @@ -39,6 +41,8 @@ type ( Discovery PeerSource trusted bool closed chansync.SetOnce + closedCtx context.Context + closedCtxCancel context.CancelFunc lastUsefulChunkReceived time.Time lastStartedExpectingToReceiveChunks time.Time @@ -223,6 +227,10 @@ func (p *Peer) close() { if !p.closed.Set() { return } + // Not set until Torrent is known. + if p.closedCtx != nil { + p.closedCtxCancel() + } if p.updateRequestsTimer != nil { p.updateRequestsTimer.Stop() } @@ -566,3 +574,8 @@ func (p *Peer) recordBlockForSmartBan(req RequestIndex, blockData []byte) { p.t.smartBanCache.RecordBlock(p.bannableAddr.Value, req, blockData) } } + +func (p *Peer) initClosedCtx() { + panicif.NotNil(p.closedCtx) + p.closedCtx, p.closedCtxCancel = context.WithCancel(p.t.closedCtx) +} diff --git a/peerconn.go b/peerconn.go index a8d466ea..1191093b 100644 --- a/peerconn.go +++ b/peerconn.go @@ -3,7 +3,6 @@ package torrent import ( "bufio" "bytes" - "context" "errors" "fmt" "io" @@ -21,6 +20,7 @@ import ( . "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/multiless" "golang.org/x/time/rate" @@ -112,6 +112,8 @@ type PeerConn struct { // we can verify all the pieces for a file when they're all arrived before submitting them to // the torrent. receivedHashPieces map[[32]byte][][32]byte + + peerRequestServerRunning bool } func (cn *PeerConn) lastWriteUploadRate() float64 { @@ -670,17 +672,42 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { } c.peerRequests[r] = value if startFetch { - // TODO: Limit peer request data read concurrency. - go c.peerRequestDataReader(r, value) + c.startPeerRequestServer() } return nil } -func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { +func (c *PeerConn) startPeerRequestServer() { + if !c.peerRequestServerRunning { + go c.peerRequestServer() + c.peerRequestServerRunning = true + } +} + +func (c *PeerConn) peerRequestServer() { +again: + c.locker().Lock() + if !c.closed.IsSet() { + for r, state := range c.peerRequests { + if state.data == nil { + c.locker().Unlock() + c.servePeerRequest(r, state) + goto again + } + } + } + panicif.False(c.peerRequestServerRunning) + c.peerRequestServerRunning = false + c.locker().Unlock() + return +} + +// TODO: Return an error then let caller filter on conditions. +func (c *PeerConn) servePeerRequest(r Request, prs *peerRequestState) { // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, // or fail to read and then cleanup. Also, we used to hang here if the reservation was never // dropped, that was fixed. - ctx := context.Background() + ctx := c.closedCtx err := prs.allocReservation.Wait(ctx) if err != nil { c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err) @@ -689,6 +716,10 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { b, err := c.readPeerRequestData(r) c.locker().Lock() defer c.locker().Unlock() + // This function should remove work from peerRequests so peerRequestServer does not stall. + defer func() { + panicif.True(MapContains(c.peerRequests, r) && prs.data == nil) + }() if err != nil { c.peerRequestDataReadFailed(err, r) } else { @@ -1150,10 +1181,9 @@ func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRe } func (c *PeerConn) setTorrent(t *Torrent) { - if c.t != nil { - panic("connection already associated with a torrent") - } + panicif.NotNil(c.t) c.t = t + c.initClosedCtx() c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t) c.setPeerLoggers(t.logger, t.slogger()) t.reconcileHandshakeStats(c.peerPtr()) diff --git a/torrent.go b/torrent.go index 3c0ac745..ff6a4c93 100644 --- a/torrent.go +++ b/torrent.go @@ -3078,6 +3078,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { }, hostKey: t.deriveWebSeedHostKey(url), } + ws.peer.initClosedCtx() for _, opt := range opts { opt(&ws.client) } -- 2.51.0