]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix peer request handling concurrency
authorMatt Joiner <anacrolix@gmail.com>
Tue, 22 Jul 2025 13:17:41 +0000 (23:17 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 22 Jul 2025 13:17:41 +0000 (23:17 +1000)
peer.go
peerconn.go
torrent.go

diff --git a/peer.go b/peer.go
index 6bcddd495b06e8914268e324009a9c465c5905c8..7dcf6503918e8a095ba0c40d3efc86446ecc677a 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+       "context"
        "fmt"
        "io"
        "log/slog"
@@ -15,6 +16,7 @@ import (
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/missinggo/v2/panicif"
        "github.com/anacrolix/multiless"
 
        "github.com/anacrolix/torrent/internal/alloclim"
@@ -39,6 +41,8 @@ type (
                Discovery               PeerSource
                trusted                 bool
                closed                  chansync.SetOnce
+               closedCtx               context.Context
+               closedCtxCancel         context.CancelFunc
                lastUsefulChunkReceived time.Time
 
                lastStartedExpectingToReceiveChunks time.Time
@@ -223,6 +227,10 @@ func (p *Peer) close() {
        if !p.closed.Set() {
                return
        }
+       // Not set until Torrent is known.
+       if p.closedCtx != nil {
+               p.closedCtxCancel()
+       }
        if p.updateRequestsTimer != nil {
                p.updateRequestsTimer.Stop()
        }
@@ -566,3 +574,8 @@ func (p *Peer) recordBlockForSmartBan(req RequestIndex, blockData []byte) {
                p.t.smartBanCache.RecordBlock(p.bannableAddr.Value, req, blockData)
        }
 }
+
+func (p *Peer) initClosedCtx() {
+       panicif.NotNil(p.closedCtx)
+       p.closedCtx, p.closedCtxCancel = context.WithCancel(p.t.closedCtx)
+}
index a8d466ea970e04eca717f41ae69fb1853b6fefdb..1191093bd6f27c18a3cca40d2929978355061252 100644 (file)
@@ -3,7 +3,6 @@ package torrent
 import (
        "bufio"
        "bytes"
-       "context"
        "errors"
        "fmt"
        "io"
@@ -21,6 +20,7 @@ import (
        . "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/missinggo/v2/panicif"
        "github.com/anacrolix/multiless"
 
        "golang.org/x/time/rate"
@@ -112,6 +112,8 @@ type PeerConn struct {
        // we can verify all the pieces for a file when they're all arrived before submitting them to
        // the torrent.
        receivedHashPieces map[[32]byte][][32]byte
+
+       peerRequestServerRunning bool
 }
 
 func (cn *PeerConn) lastWriteUploadRate() float64 {
@@ -670,17 +672,42 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
        }
        c.peerRequests[r] = value
        if startFetch {
-               // TODO: Limit peer request data read concurrency.
-               go c.peerRequestDataReader(r, value)
+               c.startPeerRequestServer()
        }
        return nil
 }
 
-func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
+func (c *PeerConn) startPeerRequestServer() {
+       if !c.peerRequestServerRunning {
+               go c.peerRequestServer()
+               c.peerRequestServerRunning = true
+       }
+}
+
+func (c *PeerConn) peerRequestServer() {
+again:
+       c.locker().Lock()
+       if !c.closed.IsSet() {
+               for r, state := range c.peerRequests {
+                       if state.data == nil {
+                               c.locker().Unlock()
+                               c.servePeerRequest(r, state)
+                               goto again
+                       }
+               }
+       }
+       panicif.False(c.peerRequestServerRunning)
+       c.peerRequestServerRunning = false
+       c.locker().Unlock()
+       return
+}
+
+// TODO: Return an error then let caller filter on conditions.
+func (c *PeerConn) servePeerRequest(r Request, prs *peerRequestState) {
        // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
        // or fail to read and then cleanup. Also, we used to hang here if the reservation was never
        // dropped, that was fixed.
-       ctx := context.Background()
+       ctx := c.closedCtx
        err := prs.allocReservation.Wait(ctx)
        if err != nil {
                c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err)
@@ -689,6 +716,10 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
        b, err := c.readPeerRequestData(r)
        c.locker().Lock()
        defer c.locker().Unlock()
+       // This function should remove work from peerRequests so peerRequestServer does not stall.
+       defer func() {
+               panicif.True(MapContains(c.peerRequests, r) && prs.data == nil)
+       }()
        if err != nil {
                c.peerRequestDataReadFailed(err, r)
        } else {
@@ -1150,10 +1181,9 @@ func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRe
 }
 
 func (c *PeerConn) setTorrent(t *Torrent) {
-       if c.t != nil {
-               panic("connection already associated with a torrent")
-       }
+       panicif.NotNil(c.t)
        c.t = t
+       c.initClosedCtx()
        c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t)
        c.setPeerLoggers(t.logger, t.slogger())
        t.reconcileHandshakeStats(c.peerPtr())
index 3c0ac74550364218a6443c3b6d68dd64e7ce9473..ff6a4c9315b12f0b32a50459d2055308f06da0f6 100644 (file)
@@ -3078,6 +3078,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
                },
                hostKey: t.deriveWebSeedHostKey(url),
        }
+       ws.peer.initClosedCtx()
        for _, opt := range opts {
                opt(&ws.client)
        }