$current_lei = $persist ? undef : $self; # for SIG{__WARN__}
}
+sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
+ my ($self) = @_;
+ if (my $op = delete $self->{pkt_op_c}) { # in case of die
+ $op->close; # PublicInbox::PktOp::close
+ }
+ my $unclosed_after_die = delete($self->{pkt_op_p}) or return;
+ close $unclosed_after_die;
+}
+
+sub pkt_op_pair {
+ my ($self, $ops) = @_;
+ require PublicInbox::OnDestroy;
+ require PublicInbox::PktOp;
+ my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
+ @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+ $end;
+}
+
sub workers_start {
my ($lei, $wq, $ident, $jobs, $ops) = @_;
$ops = {
($ops ? %$ops : ()),
};
$ops->{''} //= [ \&dclose, $lei ];
- require PublicInbox::PktOp;
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ my $end = $lei->pkt_op_pair($ops);
$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
delete $lei->{pkt_op_p};
my $op = delete $lei->{pkt_op_c};
+ @$end = ();
$lei->event_step_init;
# oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
$lei->{oneshot} ? $op : undef;
$self->SUPER::ipc_atfork_child;
}
-sub delete_pkt_op { # OnDestroy callback
- my $unclosed_after_die = delete($_[0])->{pkt_op_p} or return;
- close $unclosed_after_die;
-}
-
sub do_query {
my ($self, $lei) = @_;
my $l2m = $lei->{l2m};
'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
};
$lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
- my $od = PublicInbox::OnDestroy->new($$, \&delete_pkt_op, $lei);
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ my $end = $lei->pkt_op_pair($ops);
$lei->{1}->autoflush(1);
$lei->start_pager if delete $lei->{need_pager};
$lei->{ovv}->ovv_begin($lei);
$lei->oldset, { lei => $lei });
my $op = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
+ @$end = ();
$self->{threads} = $lei->{opt}->{threads};
if ($l2m) {
$l2m->net_merge_complete unless $lei->{auth};