]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: more consistent IPC exit and error handling
authorEric Wong <e@80x24.org>
Sun, 7 Feb 2021 08:51:54 +0000 (08:51 +0000)
committerEric Wong <e@80x24.org>
Sun, 7 Feb 2021 22:57:11 +0000 (22:57 +0000)
We're able to propagate $? from wq_workers in a consistent
manner, now.

lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiXSearch.pm

index 728f726c4adb99a47891536ebe3d959c4d75f7fc..c8673e26832fd960601903aa532e8b5ac290cc46 100644 (file)
@@ -140,10 +140,9 @@ sub ipc_worker_reap { # dwaitpid callback
 }
 
 sub wq_wait_old {
-       my ($self, @args) = @_;
-       my $cb = ref($args[0]) eq 'CODE' ? shift(@args) : \&ipc_worker_reap;
+       my ($self, $cb, @args) = @_;
        my $pids = delete $self->{"-wq_old_pids.$$"} or return;
-       dwaitpid($_, $cb, [$self, @args]) for @$pids;
+       dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
 }
 
 # for base class, override in sub classes
@@ -348,13 +347,12 @@ sub wq_exit { # wakes up wq_worker_decr_wait
 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
        my ($self) = @_;
        return unless wq_workers($self);
-       my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
-       $self->wq_io_do('wq_exit', [ $s2, $s2, $s2 ]);
+       $self->wq_io_do('wq_exit');
        # caller must call wq_worker_decr_wait in main loop
 }
 
 sub wq_worker_decr_wait {
-       my ($self, $timeout) = @_;
+       my ($self, $timeout, $cb, @args) = @_;
        return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
        my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
        vec(my $rin = '', fileno($s1), 1) = 1;
@@ -363,17 +361,17 @@ sub wq_worker_decr_wait {
        recv($s1, my $pid, 64, 0) // croak "recv: $!";
        my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers';
        delete $workers->{$pid} // croak "BUG: PID:$pid invalid";
-       dwaitpid($pid, \&ipc_worker_reap, $self);
+       dwaitpid($pid, $cb // \&ipc_worker_reap, [ $self, @args ]);
 }
 
 # set or retrieve number of workers
 sub wq_workers {
-       my ($self, $nr) = @_;
+       my ($self, $nr, $cb, @args) = @_;
        my $cur = $self->{-wq_workers} or return;
        if (defined $nr) {
                while (scalar(keys(%$cur)) > $nr) {
                        $self->wq_worker_decr;
-                       $self->wq_worker_decr_wait;
+                       $self->wq_worker_decr_wait(undef, $cb, @args);
                }
                $self->wq_worker_incr while scalar(keys(%$cur)) < $nr;
        }
@@ -381,7 +379,7 @@ sub wq_workers {
 }
 
 sub wq_close {
-       my ($self, $nohang) = @_;
+       my ($self, $nohang, $cb, @args) = @_;
        delete @$self{qw(-wq_s1 -wq_s2)} or return;
        my $ppid = delete $self->{-wq_ppid} or return;
        my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
@@ -390,7 +388,9 @@ sub wq_close {
        if ($nohang) {
                push @{$self->{"-wq_old_pids.$$"}}, @pids;
        } else {
-               dwaitpid($_, \&ipc_worker_reap, $self) for @pids;
+               $cb //= \&ipc_worker_reap;
+               unshift @args, $self;
+               dwaitpid($_, $cb, \@args) for @pids;
        }
 }
 
index 515bc2a3e121cf31f0d9bb8ea2774672484d5a74..21862488fabb484fad491fbc59447c2bd8deca59 100644 (file)
@@ -360,7 +360,7 @@ sub fail_handler ($;$$) {
        my ($lei, $code, $io) = @_;
        for my $f (@WQ_KEYS) {
                my $wq = delete $lei->{$f} or next;
-               $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
+               $wq->wq_wait_old(undef, $lei) if $wq->wq_kill_old; # lei-daemon
        }
        close($io) if $io; # needed to avoid warnings on SIGPIPE
        $lei->x_it($code // (1 >> 8));
@@ -827,9 +827,9 @@ sub dclose {
        for my $f (@WQ_KEYS) {
                my $wq = delete $self->{$f} or next;
                if ($wq->wq_kill) {
-                       $wq->wq_close
+                       $wq->wq_close(0, undef, $self);
                } elsif ($wq->wq_kill_old) {
-                       $wq->wq_wait_old($self);
+                       $wq->wq_wait_old(undef, $self);
                }
        }
        close(delete $self->{1}) if $self->{1}; # may reap_compress
index 3a99570e3767c9b0ec9261f81d66fb2c16b0227a..2b2dc2f7ecfe5134c5abe605dae08285cda32b46 100644 (file)
@@ -14,12 +14,18 @@ sub _import_eml { # MboxReader callback
        $sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ());
 }
 
+sub import_done_wait { # dwaitpid callback
+       my ($arg, $pid) = @_;
+       my ($imp, $lei) = @$arg;
+       $lei->child_error($?, 'non-fatal errors during import') if $?;
+       my $ign = $lei->{sto}->ipc_do('done'); # PublicInbox::LeiStore::done
+       $lei->dclose;
+}
+
 sub import_done { # EOF callback for main daemon
        my ($lei) = @_;
-       my $imp = delete $lei->{imp};
-       $imp->wq_wait_old($lei) if $imp;
-       my $wait = $lei->{sto}->ipc_do('done');
-       $lei->dclose;
+       my $imp = delete $lei->{imp} or return;
+       $imp->wq_wait_old(\&import_done_wait, $lei);
 }
 
 sub call { # the main "lei import" method
index 1ba767c1df4fc2b2cded116949e1d94a85f9afb8..1024b0209da05a124b7cdd4a01b34d04fcc8b272 100644 (file)
@@ -279,12 +279,18 @@ sub git_tmp ($) {
        $git;
 }
 
+sub xsearch_done_wait { # dwaitpid callback
+       my ($arg, $pid) = @_;
+       my ($wq, $lei) = @$arg;
+       $lei->child_error($?, 'non-fatal error from '.ref($wq)) if $?;
+}
+
 sub query_done { # EOF callback for main daemon
        my ($lei) = @_;
        my $l2m = delete $lei->{l2m};
-       $l2m->wq_wait_old($lei) if $l2m;
+       $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
        if (my $lxs = delete $lei->{lxs}) {
-               $lxs->wq_wait_old($lei);
+               $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
        }
        $lei->{ovv}->ovv_end($lei);
        if ($l2m) { # close() calls LeiToMail reap_compress
@@ -309,7 +315,7 @@ sub do_post_augment {
        if (my $err = $@) {
                if (my $lxs = delete $lei->{lxs}) {
                        $lxs->wq_kill;
-                       $lxs->wq_close;
+                       $lxs->wq_close(0, undef, $lei);
                }
                $lei->fail("$err");
        }