$current_lei = $persist ? undef : $self; # for SIG{__WARN__}
}
+sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
+ my ($self) = @_;
+ if (my $op = delete $self->{pkt_op_c}) { # in case of die
+ $op->close; # PublicInbox::PktOp::close
+ }
+ my $unclosed_after_die = delete($self->{pkt_op_p}) or return;
+ close $unclosed_after_die;
+}
+
+sub pkt_op_pair {
+ my ($self, $ops) = @_;
+ require PublicInbox::OnDestroy;
+ require PublicInbox::PktOp;
+ my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
+ @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+ $end;
+}
+
sub workers_start {
my ($lei, $wq, $ident, $jobs, $ops) = @_;
$ops = {
($ops ? %$ops : ()),
};
$ops->{''} //= [ \&dclose, $lei ];
- require PublicInbox::PktOp;
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ my $end = $lei->pkt_op_pair($ops);
$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
delete $lei->{pkt_op_p};
my $op = delete $lei->{pkt_op_c};
+ @$end = ();
$lei->event_step_init;
# oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
$lei->{oneshot} ? $op : undef;