From 5e9b848e1a9b9543d389cf4e648e52907260eb6c Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 5 Aug 2025 22:38:40 +1000 Subject: [PATCH] Fix very rare double Torrent close panic --- client.go | 18 +++++------------- errors.go | 1 + peerconn_test.go | 3 ++- t.go | 18 ++++++++++-------- torrent.go | 12 +++++++----- 5 files changed, 25 insertions(+), 27 deletions(-) create mode 100644 errors.go diff --git a/client.go b/client.go index f67f2a89..473aa871 100644 --- a/client.go +++ b/client.go @@ -518,12 +518,9 @@ func (cl *Client) Close() (errs []error) { var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning cl.lock() for t := range cl.torrents { - // Can we not modify cl.torrents as we delete from it? - err := cl.dropTorrent(t, &closeGroup) - if err != nil { - errs = append(errs, err) - } + cl.dropTorrent(t, &closeGroup) } + // Can we not modify cl.torrents as we delete from it? panicif.NotZero(len(cl.torrents)) panicif.NotZero(len(cl.torrentsByShortHash)) cl.clearPortMappings() @@ -1440,6 +1437,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { opts.ChunkSize = defaultChunkSize } t.setChunkSize(opts.ChunkSize) + cl.torrents[t] = struct{}{} return } @@ -1510,7 +1508,6 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) { } }) cl.torrentsByShortHash[infoHash] = t - cl.torrents[t] = struct{}{} t.setInfoBytesLocked(opts.InfoBytes) cl.clearAcceptLimits() t.updateWantPeersEvent() @@ -1589,13 +1586,8 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { return errors.Join(t.addPieceLayersLocked(spec.PieceLayers)...) } -func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) { - t.eachShortInfohash(func(short [20]byte) { - delete(cl.torrentsByShortHash, short) - }) - err = t.close(wg) - delete(cl.torrents, t) - return +func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) { + t.close(wg) } func (cl *Client) allTorrentsCompleted() bool { diff --git a/errors.go b/errors.go new file mode 100644 index 00000000..10cbafc7 --- /dev/null +++ b/errors.go @@ -0,0 +1 @@ +package torrent diff --git a/peerconn_test.go b/peerconn_test.go index 7c324a1f..5cceb9c8 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -267,13 +267,14 @@ func TestConnPexEvent(t *testing.T) { func TestHaveAllThenBitfield(t *testing.T) { cl := newTestingClient(t) tt := cl.newTorrentForTesting() - // cl.newConnection() + //pc := cl.newConnection(nil, newConnectionOpts{}) pc := PeerConn{ Peer: Peer{t: tt}, } pc.initRequestState() pc.legacyPeerImpl = &pc tt.conns[&pc] = struct{}{} + g.InitNew(&pc.callbacks) qt.Assert(t, qt.IsNil(pc.onPeerSentHaveAll())) qt.Check(t, qt.DeepEquals(pc.t.connsWithAllPieces, map[*Peer]struct{}{&pc.Peer: {}})) pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) diff --git a/t.go b/t.go index 16fab38e..ceea328a 100644 --- a/t.go +++ b/t.go @@ -96,18 +96,20 @@ func (t *Torrent) PieceBytesMissing(piece int) int64 { return int64(t.pieces[piece].bytesLeft()) } -// Drop the torrent from the client, and close it. It's always safe to do -// this. No data corruption can, or should occur to either the torrent's data, -// or connected peers. +// Drop the torrent from the client, and close it. It's always safe to do this. No data corruption +// can, or should occur to either the torrent's data, or connected peers. func (t *Torrent) Drop() { - var wg sync.WaitGroup - defer wg.Wait() + if t.closed.IsSet() { + return + } t.cl.lock() defer t.cl.unlock() - err := t.cl.dropTorrent(t, &wg) - if err != nil { - panic(err) + if t.closed.IsSet() { + return } + var wg sync.WaitGroup + t.close(&wg) + wg.Wait() } // Number of bytes of the entire torrent we have completed. This is the sum of diff --git a/torrent.go b/torrent.go index 0bd27b57..825ac53a 100644 --- a/torrent.go +++ b/torrent.go @@ -1080,11 +1080,12 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) { return pieceIndex(t._completedPieces.GetCardinality()) } -func (t *Torrent) close(wg *sync.WaitGroup) (err error) { - if !t.closed.Set() { - err = errors.New("already closed") - return - } +func (t *Torrent) close(wg *sync.WaitGroup) { + // Should only be called from the Client. + panicif.False(t.closed.Set()) + t.eachShortInfohash(func(short [20]byte) { + delete(t.cl.torrentsByShortHash, short) + }) t.closedCtxCancel(errTorrentClosed) t.getInfoCtxCancel(errTorrentClosed) for _, f := range t.onClose { @@ -1115,6 +1116,7 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { t.cl.event.Broadcast() t.pieceStateChanges.Close() t.updateWantPeersEvent() + g.MustDelete(t.cl.torrents, t) return } -- 2.51.0