]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: clean up pkt_op consumer on exception, too
authorEric Wong <e@80x24.org>
Wed, 24 Mar 2021 09:23:31 +0000 (14:23 +0500)
committerEric Wong <e@80x24.org>
Wed, 24 Mar 2021 23:01:19 +0000 (23:01 +0000)
We need to consistently ensure pkt_op_c doesn't lead to a
long-lived circular reference if an exception is thrown in
pre_augment.  Maybe the API could be better, but this fixes an
FD leak when attempting to --augment a FIFO.

Followup-to: b9524082ba39e665 ("lei_xsearch: cleanup {pkt_op_p} on exceptions")
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiXSearch.pm

index 743725322d3084f2f649c669d7a4c8cf0b81db47..878685f1ad0527390dd7d6ac71a6639ac970cd82 100644 (file)
@@ -482,6 +482,24 @@ sub _lei_atfork_child {
        $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
+sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
+       my ($self) = @_;
+       if (my $op = delete $self->{pkt_op_c}) { # in case of die
+               $op->close; # PublicInbox::PktOp::close
+       }
+       my $unclosed_after_die = delete($self->{pkt_op_p}) or return;
+       close $unclosed_after_die;
+}
+
+sub pkt_op_pair {
+       my ($self, $ops) = @_;
+       require PublicInbox::OnDestroy;
+       require PublicInbox::PktOp;
+       my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
+       @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+       $end;
+}
+
 sub workers_start {
        my ($lei, $wq, $ident, $jobs, $ops) = @_;
        $ops = {
@@ -492,11 +510,11 @@ sub workers_start {
                ($ops ? %$ops : ()),
        };
        $ops->{''} //= [ \&dclose, $lei ];
-       require PublicInbox::PktOp;
-       ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+       my $end = $lei->pkt_op_pair($ops);
        $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
        delete $lei->{pkt_op_p};
        my $op = delete $lei->{pkt_op_c};
+       @$end = ();
        $lei->event_step_init;
        # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
        $lei->{oneshot} ? $op : undef;
index c6b82eeb5bcf31cf285c451f4b163b730e950bd2..58b6cfc01709f933ec273df8b5790eadda9a9f78 100644 (file)
@@ -406,11 +406,6 @@ sub ipc_atfork_child {
        $self->SUPER::ipc_atfork_child;
 }
 
-sub delete_pkt_op { # OnDestroy callback
-       my $unclosed_after_die = delete($_[0])->{pkt_op_p} or return;
-       close $unclosed_after_die;
-}
-
 sub do_query {
        my ($self, $lei) = @_;
        my $l2m = $lei->{l2m};
@@ -426,8 +421,7 @@ sub do_query {
                'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
        };
        $lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
-       my $od = PublicInbox::OnDestroy->new($$, \&delete_pkt_op, $lei);
-       ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+       my $end = $lei->pkt_op_pair($ops);
        $lei->{1}->autoflush(1);
        $lei->start_pager if delete $lei->{need_pager};
        $lei->{ovv}->ovv_begin($lei);
@@ -446,6 +440,7 @@ sub do_query {
                                $lei->oldset, { lei => $lei });
        my $op = delete $lei->{pkt_op_c};
        delete $lei->{pkt_op_p};
+       @$end = ();
        $self->{threads} = $lei->{opt}->{threads};
        if ($l2m) {
                $l2m->net_merge_complete unless $lei->{auth};