}
sub query_done { # EOF callback
- my ($self, $lei) = @_;
- my $l2m = delete $lei->{l2m};
- $l2m->wq_wait_old if $l2m;
- $self->wq_wait_old;
+ my ($lei) = @_;
+ my $has_l2m = exists $lei->{l2m};
+ for my $f (qw(lxs l2m)) {
+ my $wq = delete $lei->{$f} or next;
+ $wq->wq_wait_old;
+ }
$lei->{ovv}->ovv_end($lei);
- if ($l2m) { # close() calls LeiToMail reap_compress
+ if ($has_l2m) { # close() calls LeiToMail reap_compress
close(delete($lei->{1})) if $lei->{1};
$lei->start_mua;
}
syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
}
-sub sigpipe_handler { # handles SIGPIPE from wq workers
- my ($self, $lei_orig) = @_;
- if ($self->wq_kill_old) {
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+ my ($lei) = @_;
+ my $lxs = delete $lei->{lxs};
+ if ($lxs && $lxs->wq_kill_old) {
kill 'PIPE', $$;
- $self->wq_wait_old;
- } else {
- $self->wq_kill;
- $self->wq_close;
+ $lxs->wq_wait_old;
}
- close(delete $lei_orig->{1}) if $lei_orig->{1};
+ close(delete $lei->{1}) if $lei->{1};
}
sub do_query {
$lei_orig->event_step_init; # wait for shutdowns
my $done_op = {
- '' => [ \&query_done, $self, $lei_orig ],
- '!' => [ \&sigpipe_handler, $self, $lei_orig ]
+ '' => [ \&query_done, $lei_orig ],
+ '!' => [ \&sigpipe_handler, $lei_orig ]
};
my $in_loop = exists $lei_orig->{sock};
$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);