]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
lei: clean up pkt_op consumer on exception, too
[public-inbox.git] / lib / PublicInbox / LEI.pm
index 743725322d3084f2f649c669d7a4c8cf0b81db47..878685f1ad0527390dd7d6ac71a6639ac970cd82 100644 (file)
@@ -482,6 +482,24 @@ sub _lei_atfork_child {
        $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
+sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
+       my ($self) = @_;
+       if (my $op = delete $self->{pkt_op_c}) { # in case of die
+               $op->close; # PublicInbox::PktOp::close
+       }
+       my $unclosed_after_die = delete($self->{pkt_op_p}) or return;
+       close $unclosed_after_die;
+}
+
+sub pkt_op_pair {
+       my ($self, $ops) = @_;
+       require PublicInbox::OnDestroy;
+       require PublicInbox::PktOp;
+       my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
+       @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+       $end;
+}
+
 sub workers_start {
        my ($lei, $wq, $ident, $jobs, $ops) = @_;
        $ops = {
@@ -492,11 +510,11 @@ sub workers_start {
                ($ops ? %$ops : ()),
        };
        $ops->{''} //= [ \&dclose, $lei ];
-       require PublicInbox::PktOp;
-       ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+       my $end = $lei->pkt_op_pair($ops);
        $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
        delete $lei->{pkt_op_p};
        my $op = delete $lei->{pkt_op_c};
+       @$end = ();
        $lei->event_step_init;
        # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
        $lei->{oneshot} ? $op : undef;