sub mset_progress {
my $lei = shift;
return unless $lei->{-progress};
- if ($lei->{pkt_op}) { # called via pkt_op/pkt_do from workers
- pkt_do($lei->{pkt_op}, 'mset_progress', @_);
+ if ($lei->{pkt_op_p}) {
+ pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
} else { # single lei-daemon consumer
my ($desc, $mset_size, $mset_total_est) = @_;
$lei->{-mset_total} += $mset_size;
}
sub query_thread_mset { # for --thread
- my ($self, $lei, $ibxish) = @_;
+ my ($self, $ibxish) = @_;
local $0 = "$0 query_thread_mset";
- $lei->atfork_child_wq($self);
+ my $lei = $self->{lei};
my $startq = delete $lei->{startq};
-
my ($srch, $over) = ($ibxish->search, $ibxish->over);
my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
}
sub query_mset { # non-parallel for non-"--thread" users
- my ($self, $lei) = @_;
+ my ($self) = @_;
local $0 = "$0 query_mset";
- $lei->atfork_child_wq($self);
+ my $lei = $self->{lei};
my $startq = delete $lei->{startq};
my $mo = { %{$lei->{mset_opt}} };
my $mset;
}
sub query_remote_mboxrd {
- my ($self, $lei, $uris) = @_;
+ my ($self, $uris) = @_;
local $0 = "$0 query_remote_mboxrd";
- $lei->atfork_child_wq($self);
local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
+ my $lei = $self->{lei};
my ($opt, $env) = @$lei{qw(opt env)};
my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
push(@qform, t => 1) if $opt->{thread};
$git;
}
-sub query_done { # EOF callback
+sub query_done { # EOF callback for main daemon
my ($lei) = @_;
my $has_l2m = exists $lei->{l2m};
for my $f (qw(lxs l2m)) {
}
sub do_post_augment {
- my ($lei, $zpipe, $au_done) = @_;
- my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
- eval { $l2m->post_augment($lei, $zpipe) };
+ my ($lei) = @_;
+ eval { $lei->{l2m}->post_augment($lei) };
if (my $err = $@) {
if (my $lxs = delete $lei->{lxs}) {
$lxs->wq_kill;
}
$lei->fail("$err");
}
- close $au_done; # triggers wait_startq
+ close(delete $lei->{au_done}); # triggers wait_startq
}
my $MAX_PER_HOST = 4;
}
sub start_query { # always runs in main (lei-daemon) process
- my ($self, $io, $lei) = @_;
+ my ($self, $lei) = @_;
if ($lei->{opt}->{thread}) {
for my $ibxish (locals($self)) {
- $self->wq_do('query_thread_mset', $io, $lei, $ibxish);
+ $self->wq_do('query_thread_mset', [], $ibxish);
}
} elsif (locals($self)) {
- $self->wq_do('query_mset', $io, $lei);
+ $self->wq_do('query_mset', []);
}
my $i = 0;
my $q = [];
push @{$q->[$i++ % $MAX_PER_HOST]}, $uri;
}
for my $uris (@$q) {
- $self->wq_do('query_remote_mboxrd', $io, $lei, $uris);
+ $self->wq_do('query_remote_mboxrd', [], $uris);
}
- @$io = ();
+}
+
+sub ipc_atfork_child {
+ my ($self) = @_;
+ $self->{lei}->lei_atfork_child;
+ $self->SUPER::ipc_atfork_child;
}
sub query_prepare { # called by wq_do
- my ($self, $lei) = @_;
+ my ($self) = @_;
local $0 = "$0 query_prepare";
- $lei->atfork_child_wq($self);
- delete $lei->{l2m}->{-wq_s1};
+ my $lei = $self->{lei};
eval { $lei->{l2m}->do_augment($lei) };
$lei->fail($@) if $@;
- pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
+ pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
}
sub fail_handler ($;$$) {
sub do_query {
my ($self, $lei) = @_;
- $lei->{1}->autoflush(1);
- $lei->start_pager if -t $lei->{1};
- $lei->{ovv}->ovv_begin($lei);
- my ($au_done, $zpipe);
- my $l2m = $lei->{l2m};
- $lei->atfork_prepare_wq($self);
- $self->wq_workers_start('lei_xsearch', $self->{jobs}, $lei->oldset);
- delete $self->{-ipc_atfork_child_close};
- if ($l2m) {
- $lei->atfork_prepare_wq($l2m);
- $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, $lei->oldset);
- delete $l2m->{-ipc_atfork_child_close};
- pipe($lei->{startq}, $au_done) or die "pipe: $!";
- # 1031: F_SETPIPE_SZ
- fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
- $zpipe = $l2m->pre_augment($lei);
- }
my $ops = {
'|' => [ \&sigpipe_handler, $lei ],
'!' => [ \&fail_handler, $lei ],
- '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
+ '.' => [ \&do_post_augment, $lei ],
'' => [ \&query_done, $lei ],
'mset_progress' => [ \&mset_progress, $lei ],
'x_it' => [ $lei->can('x_it'), $lei ],
'child_error' => [ $lei->can('child_error'), $lei ],
};
- (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops);
- my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
- delete($lei->{pkt_op});
-
- $lei->event_step_init; # wait for shutdowns
+ ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ $lei->{1}->autoflush(1);
+ $lei->start_pager if -t $lei->{1};
+ $lei->{ovv}->ovv_begin($lei);
+ my $l2m = $lei->{l2m};
if ($l2m) {
- $self->wq_do('query_prepare', \@io, $lei_ipc);
- $io[1] = $zpipe->[1] if $zpipe;
+ $l2m->pre_augment($lei);
+ $l2m->wq_workers_start('lei2mail', $l2m->{jobs},
+ $lei->oldset, { lei => $lei });
+ pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
+ # 1031: F_SETPIPE_SZ
+ fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
}
- start_query($self, \@io, $lei_ipc);
- $self->wq_close(1);
+ $self->wq_workers_start('lei_xsearch', $self->{jobs},
+ $lei->oldset, { lei => $lei });
+ my $op = delete $lei->{pkt_op_c};
+ delete $lei->{pkt_op_p};
+ $l2m->wq_close(1) if $l2m;
+ $lei->event_step_init; # wait for shutdowns
+ $self->wq_do('query_prepare', []) if $l2m;
+ start_query($self, $lei);
+ $self->wq_close(1); # lei_xsearch workers stop when done
if ($lei->{oneshot}) {
- # for the $lei_ipc->atfork_child_wq PIPE handler:
while ($op->{sock}) { $op->event_step }
}
}