}
sub wq_wait_old {
- my ($self, @args) = @_;
- my $cb = ref($args[0]) eq 'CODE' ? shift(@args) : \&ipc_worker_reap;
+ my ($self, $cb, @args) = @_;
my $pids = delete $self->{"-wq_old_pids.$$"} or return;
- dwaitpid($_, $cb, [$self, @args]) for @$pids;
+ dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
}
# for base class, override in sub classes
sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
my ($self) = @_;
return unless wq_workers($self);
- my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
- $self->wq_io_do('wq_exit', [ $s2, $s2, $s2 ]);
+ $self->wq_io_do('wq_exit');
# caller must call wq_worker_decr_wait in main loop
}
sub wq_worker_decr_wait {
- my ($self, $timeout) = @_;
+ my ($self, $timeout, $cb, @args) = @_;
return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
vec(my $rin = '', fileno($s1), 1) = 1;
recv($s1, my $pid, 64, 0) // croak "recv: $!";
my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers';
delete $workers->{$pid} // croak "BUG: PID:$pid invalid";
- dwaitpid($pid, \&ipc_worker_reap, $self);
+ dwaitpid($pid, $cb // \&ipc_worker_reap, [ $self, @args ]);
}
# set or retrieve number of workers
sub wq_workers {
- my ($self, $nr) = @_;
+ my ($self, $nr, $cb, @args) = @_;
my $cur = $self->{-wq_workers} or return;
if (defined $nr) {
while (scalar(keys(%$cur)) > $nr) {
$self->wq_worker_decr;
- $self->wq_worker_decr_wait;
+ $self->wq_worker_decr_wait(undef, $cb, @args);
}
$self->wq_worker_incr while scalar(keys(%$cur)) < $nr;
}
}
sub wq_close {
- my ($self, $nohang) = @_;
+ my ($self, $nohang, $cb, @args) = @_;
delete @$self{qw(-wq_s1 -wq_s2)} or return;
my $ppid = delete $self->{-wq_ppid} or return;
my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
if ($nohang) {
push @{$self->{"-wq_old_pids.$$"}}, @pids;
} else {
- dwaitpid($_, \&ipc_worker_reap, $self) for @pids;
+ $cb //= \&ipc_worker_reap;
+ unshift @args, $self;
+ dwaitpid($_, $cb, \@args) for @pids;
}
}
my ($lei, $code, $io) = @_;
for my $f (@WQ_KEYS) {
my $wq = delete $lei->{$f} or next;
- $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
+ $wq->wq_wait_old(undef, $lei) if $wq->wq_kill_old; # lei-daemon
}
close($io) if $io; # needed to avoid warnings on SIGPIPE
$lei->x_it($code // (1 >> 8));
for my $f (@WQ_KEYS) {
my $wq = delete $self->{$f} or next;
if ($wq->wq_kill) {
- $wq->wq_close
+ $wq->wq_close(0, undef, $self);
} elsif ($wq->wq_kill_old) {
- $wq->wq_wait_old($self);
+ $wq->wq_wait_old(undef, $self);
}
}
close(delete $self->{1}) if $self->{1}; # may reap_compress
$sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ());
}
+sub import_done_wait { # dwaitpid callback
+ my ($arg, $pid) = @_;
+ my ($imp, $lei) = @$arg;
+ $lei->child_error($?, 'non-fatal errors during import') if $?;
+ my $ign = $lei->{sto}->ipc_do('done'); # PublicInbox::LeiStore::done
+ $lei->dclose;
+}
+
sub import_done { # EOF callback for main daemon
my ($lei) = @_;
- my $imp = delete $lei->{imp};
- $imp->wq_wait_old($lei) if $imp;
- my $wait = $lei->{sto}->ipc_do('done');
- $lei->dclose;
+ my $imp = delete $lei->{imp} or return;
+ $imp->wq_wait_old(\&import_done_wait, $lei);
}
sub call { # the main "lei import" method
$git;
}
+sub xsearch_done_wait { # dwaitpid callback
+ my ($arg, $pid) = @_;
+ my ($wq, $lei) = @$arg;
+ $lei->child_error($?, 'non-fatal error from '.ref($wq)) if $?;
+}
+
sub query_done { # EOF callback for main daemon
my ($lei) = @_;
my $l2m = delete $lei->{l2m};
- $l2m->wq_wait_old($lei) if $l2m;
+ $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_wait_old($lei);
+ $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
}
$lei->{ovv}->ovv_end($lei);
if ($l2m) { # close() calls LeiToMail reap_compress
if (my $err = $@) {
if (my $lxs = delete $lei->{lxs}) {
$lxs->wq_kill;
- $lxs->wq_close;
+ $lxs->wq_close(0, undef, $lei);
}
$lei->fail("$err");
}