]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix very rare double Torrent close panic
authorMatt Joiner <anacrolix@gmail.com>
Tue, 5 Aug 2025 12:38:40 +0000 (22:38 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 5 Aug 2025 12:38:40 +0000 (22:38 +1000)
client.go
errors.go [new file with mode: 0644]
peerconn_test.go
t.go
torrent.go

index f67f2a89f6fc279ed1ba39fdc2f16a4a4ed0e1ca..473aa871b535178d2755fc336203b80c2adae1a5 100644 (file)
--- a/client.go
+++ b/client.go
@@ -518,12 +518,9 @@ func (cl *Client) Close() (errs []error) {
        var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
        cl.lock()
        for t := range cl.torrents {
-               // Can we not modify cl.torrents as we delete from it?
-               err := cl.dropTorrent(t, &closeGroup)
-               if err != nil {
-                       errs = append(errs, err)
-               }
+               cl.dropTorrent(t, &closeGroup)
        }
+       // Can we not modify cl.torrents as we delete from it?
        panicif.NotZero(len(cl.torrents))
        panicif.NotZero(len(cl.torrentsByShortHash))
        cl.clearPortMappings()
@@ -1440,6 +1437,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
                opts.ChunkSize = defaultChunkSize
        }
        t.setChunkSize(opts.ChunkSize)
+       cl.torrents[t] = struct{}{}
        return
 }
 
@@ -1510,7 +1508,6 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
                }
        })
        cl.torrentsByShortHash[infoHash] = t
-       cl.torrents[t] = struct{}{}
        t.setInfoBytesLocked(opts.InfoBytes)
        cl.clearAcceptLimits()
        t.updateWantPeersEvent()
@@ -1589,13 +1586,8 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
        return errors.Join(t.addPieceLayersLocked(spec.PieceLayers)...)
 }
 
-func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) {
-       t.eachShortInfohash(func(short [20]byte) {
-               delete(cl.torrentsByShortHash, short)
-       })
-       err = t.close(wg)
-       delete(cl.torrents, t)
-       return
+func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) {
+       t.close(wg)
 }
 
 func (cl *Client) allTorrentsCompleted() bool {
diff --git a/errors.go b/errors.go
new file mode 100644 (file)
index 0000000..10cbafc
--- /dev/null
+++ b/errors.go
@@ -0,0 +1 @@
+package torrent
index 7c324a1f6a748cda95fb1b529d073ef782833639..5cceb9c839216e635d429c8ace97be463736eb84 100644 (file)
@@ -267,13 +267,14 @@ func TestConnPexEvent(t *testing.T) {
 func TestHaveAllThenBitfield(t *testing.T) {
        cl := newTestingClient(t)
        tt := cl.newTorrentForTesting()
-       // cl.newConnection()
+       //pc := cl.newConnection(nil, newConnectionOpts{})
        pc := PeerConn{
                Peer: Peer{t: tt},
        }
        pc.initRequestState()
        pc.legacyPeerImpl = &pc
        tt.conns[&pc] = struct{}{}
+       g.InitNew(&pc.callbacks)
        qt.Assert(t, qt.IsNil(pc.onPeerSentHaveAll()))
        qt.Check(t, qt.DeepEquals(pc.t.connsWithAllPieces, map[*Peer]struct{}{&pc.Peer: {}}))
        pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
diff --git a/t.go b/t.go
index 16fab38e35a0a0d51369258b75562236a60a62b1..ceea328ad0a409a7d3b91e27546efd50bfc28171 100644 (file)
--- a/t.go
+++ b/t.go
@@ -96,18 +96,20 @@ func (t *Torrent) PieceBytesMissing(piece int) int64 {
        return int64(t.pieces[piece].bytesLeft())
 }
 
-// Drop the torrent from the client, and close it. It's always safe to do
-// this. No data corruption can, or should occur to either the torrent's data,
-// or connected peers.
+// Drop the torrent from the client, and close it. It's always safe to do this. No data corruption
+// can, or should occur to either the torrent's data, or connected peers.
 func (t *Torrent) Drop() {
-       var wg sync.WaitGroup
-       defer wg.Wait()
+       if t.closed.IsSet() {
+               return
+       }
        t.cl.lock()
        defer t.cl.unlock()
-       err := t.cl.dropTorrent(t, &wg)
-       if err != nil {
-               panic(err)
+       if t.closed.IsSet() {
+               return
        }
+       var wg sync.WaitGroup
+       t.close(&wg)
+       wg.Wait()
 }
 
 // Number of bytes of the entire torrent we have completed. This is the sum of
index 0bd27b57a87318dbe5cb06fd742cf6ea2372cfa0..825ac53a73142edeb5d36d2c9ef3ac53110eae99 100644 (file)
@@ -1080,11 +1080,12 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
        return pieceIndex(t._completedPieces.GetCardinality())
 }
 
-func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
-       if !t.closed.Set() {
-               err = errors.New("already closed")
-               return
-       }
+func (t *Torrent) close(wg *sync.WaitGroup) {
+       // Should only be called from the Client.
+       panicif.False(t.closed.Set())
+       t.eachShortInfohash(func(short [20]byte) {
+               delete(t.cl.torrentsByShortHash, short)
+       })
        t.closedCtxCancel(errTorrentClosed)
        t.getInfoCtxCancel(errTorrentClosed)
        for _, f := range t.onClose {
@@ -1115,6 +1116,7 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
        t.updateWantPeersEvent()
+       g.MustDelete(t.cl.torrents, t)
        return
 }