]> Sergey Matveev's repositories - public-inbox.git/commitdiff
improve error handling on import fork / lock failures
authorEric Wong <e@yhbt.net>
Fri, 31 Jul 2020 21:36:18 +0000 (21:36 +0000)
committerEric Wong <e@yhbt.net>
Sat, 1 Aug 2020 08:07:39 +0000 (08:07 +0000)
v?fork failures seems to be the cause of locks not getting
released in -watch.  Ensure lock release doesn't get skipped
in ->done for both v1 and v2 inboxes.  We also need to do
everything we can to ensure DB handles, pipes and processes
get released even in the face of failure.

While we're at it, make failures around `git update-server-info'
non-fatal, since smart HTTP seems more popular anyways.

v2 changes:
- spawn: show failing command
- ensure waitpid is synchronous for inotify events
- teardown all fast-import processes on exception,
  not just the failing one
- beef up lock_release error handling
- release lock on fast-import spawn failure

lib/PublicInbox/DirIdle.pm
lib/PublicInbox/Import.pm
lib/PublicInbox/Lock.pm
lib/PublicInbox/Spawn.pm
lib/PublicInbox/V2Writable.pm
lib/PublicInbox/WatchMaildir.pm
t/psgi_search.t

index 89cce305f872816d4048558859d5954b2b4c00cc..daa2212b4c0f5050eaf38f61b93c2f202ce1af0f 100644 (file)
@@ -44,6 +44,7 @@ sub new {
 sub event_step {
        my ($self) = @_;
        my $cb = $self->{cb};
+       local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
        eval {
                my @events = $self->{inot}->read; # Linux::Inotify2->read
                $cb->($_) for @events;
index b50c662c70b22ce73f000910e4b297482b1d0cfb..07a4951871375674b442c1a0c5bd7710013e608f 100644 (file)
@@ -48,32 +48,35 @@ sub gfi_start {
 
        return ($self->{in}, $self->{out}) if $self->{pid};
 
-       my ($out_r, $out_w);
+       my (@ret, $out_r, $out_w);
        pipe($out_r, $out_w) or die "pipe failed: $!";
-       my $git = $self->{git};
 
        $self->lock_acquire;
-
-       local $/ = "\n";
-       my $ref = $self->{ref};
-       chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
-       if ($self->{path_type} ne '2/38' && $self->{tip}) {
-               local $/ = "\0";
-               my @tree = $git->qx(qw(ls-tree -r -z --name-only), $ref);
-               chomp @tree;
-               $self->{-tree} = { map { $_ => 1 } @tree };
+       eval {
+               my ($git, $ref) = @$self{qw(git ref)};
+               local $/ = "\n";
+               chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
+               if ($self->{path_type} ne '2/38' && $self->{tip}) {
+                       local $/ = "\0";
+                       my @t = $git->qx(qw(ls-tree -r -z --name-only), $ref);
+                       chomp @t;
+                       $self->{-tree} = { map { $_ => 1 } @t };
+               }
+               my @cmd = ('git', "--git-dir=$git->{git_dir}",
+                       qw(fast-import --quiet --done --date-format=raw));
+               my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
+               $out_w->autoflush(1);
+               $self->{in} = $in_r;
+               $self->{out} = $out_w;
+               $self->{pid} = $pid;
+               $self->{nchg} = 0;
+               @ret = ($in_r, $out_w);
+       };
+       if ($@) {
+               $self->lock_release;
+               die $@;
        }
-
-       my $git_dir = $git->{git_dir};
-       my @cmd = ('git', "--git-dir=$git_dir", qw(fast-import
-                       --quiet --done --date-format=raw));
-       my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
-       $out_w->autoflush(1);
-       $self->{in} = $in_r;
-       $self->{out} = $out_w;
-       $self->{pid} = $pid;
-       $self->{nchg} = 0;
-       ($in_r, $out_w);
+       @ret;
 }
 
 sub wfail () { die "write to fast-import failed: $!" }
@@ -175,13 +178,16 @@ sub _update_git_info ($$) {
                my $env = { GIT_INDEX_FILE => $index };
                run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env);
        }
-       run_die([@cmd, 'update-server-info']);
+       eval { run_die([@cmd, 'update-server-info']) };
        my $ibx = $self->{ibx};
-       ($ibx && $self->{path_type} eq '2/38') and eval {
-               require PublicInbox::SearchIdx;
-               my $s = PublicInbox::SearchIdx->new($ibx);
-               $s->index_sync({ ref => $self->{ref} });
-       };
+       if ($ibx && $ibx->version == 1 && -d "$ibx->{inboxdir}/public-inbox" &&
+                               eval { require PublicInbox::SearchIdx }) {
+               eval {
+                       my $s = PublicInbox::SearchIdx->new($ibx);
+                       $s->index_sync({ ref => $self->{ref} });
+               };
+               warn "$ibx->{inboxdir} index failed: $@\n" if $@;
+       }
        eval { run_die([@cmd, qw(gc --auto)]) } if $do_gc;
 }
 
@@ -460,17 +466,23 @@ sub init_bare {
 sub done {
        my ($self) = @_;
        my $w = delete $self->{out} or return;
-       my $r = delete $self->{in} or die 'BUG: missing {in} when done';
-       print $w "done\n" or wfail;
-       my $pid = delete $self->{pid} or die 'BUG: missing {pid} when done';
-       waitpid($pid, 0) == $pid or die 'fast-import did not finish';
-       $? == 0 or die "fast-import failed: $?";
-
+       eval {
+               my $r = delete $self->{in} or die 'BUG: missing {in} when done';
+               print $w "done\n" or wfail;
+               my $pid = delete $self->{pid} or
+                               die 'BUG: missing {pid} when done';
+               waitpid($pid, 0) == $pid or die 'fast-import did not finish';
+               $? == 0 or die "fast-import failed: $?";
+       };
+       my $wait_err = $@;
        my $nchg = delete $self->{nchg};
-       _update_git_info($self, 1) if $nchg;
+       if ($nchg && !$wait_err) {
+               eval { _update_git_info($self, 1) };
+               warn "E: $self->{git}->{git_dir} update info: $@\n" if $@;
+       }
        $self->lock_release(!!$nchg);
-
        $self->{git}->cleanup;
+       die $wait_err if $wait_err;
 }
 
 sub atfork_child {
index ca43682f87adae1fc38b1bab60478f08d69ce064..b2c8227f0a66c8940d9be8e60afb8571015f0fc0 100644 (file)
@@ -16,20 +16,20 @@ sub lock_acquire {
        croak 'already locked '.($lock_path // '(undef)') if $self->{lockfh};
        return unless defined($lock_path);
        sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
-               die "failed to open lock $lock_path: $!\n";
-       flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
+               croak "failed to open $lock_path: $!\n";
+       flock($lockfh, LOCK_EX) or croak "lock $lock_path failed: $!\n";
        $self->{lockfh} = $lockfh;
 }
 
 sub lock_release {
        my ($self, $wake) = @_;
-       return unless $self->{lock_path};
-       my $lockfh = delete $self->{lockfh} or croak 'not locked';
+       defined(my $lock_path = $self->{lock_path}) or return;
+       my $lockfh = delete $self->{lockfh} or croak "not locked: $lock_path";
 
        syswrite($lockfh, '.') if $wake;
 
-       flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
-       close $lockfh or die "close failed: $!\n";
+       flock($lockfh, LOCK_UN) or croak "unlock $lock_path failed: $!\n";
+       close $lockfh or croak "close $lock_path failed: $!\n";
 }
 
 1;
index 50f3185156a3d9160edc002508ee0e874997a3d0..508d43fd762b2d7c59c2480097465efca1424fef 100644 (file)
@@ -275,7 +275,7 @@ sub spawn ($;$$) {
        }
        my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
        my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd);
-       die "fork_exec failed: $!\n" unless $pid > 0;
+       die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
        $pid;
 }
 
index e071bc1e02109d6181c44598747a192c9fea7f3f..e1c9a393a36f0fa12b3a04e804984f6981c03223 100644 (file)
@@ -660,21 +660,35 @@ sub barrier { checkpoint($_[0], 1) };
 # public
 sub done {
        my ($self) = @_;
-       my $im = delete $self->{im};
-       $im->done if $im; # PublicInbox::Import::done
-       checkpoint($self);
-       my $mm = delete $self->{mm};
-       $mm->{dbh}->commit if $mm;
+       my $err = '';
+       if (my $im = delete $self->{im}) {
+               eval { $im->done }; # PublicInbox::Import::done
+               $err .= "import done: $@\n" if $@;
+       }
+       if (!$err) {
+               eval { checkpoint($self) };
+               $err .= "checkpoint: $@\n" if $@;
+       }
+       if (my $mm = delete $self->{mm}) {
+               my $m = $err ? 'rollback' : 'commit';
+               eval { $mm->{dbh}->$m };
+               $err .= "msgmap $m: $@\n" if $@;
+       }
        my $shards = delete $self->{idx_shards};
        if ($shards) {
-               $_->remote_close for @$shards;
+               for (@$shards) {
+                       eval { $_->remote_close };
+                       $err .= "shard close: $@\n" if $@;
+               }
        }
-       $self->{over}->disconnect;
+       eval { $self->{over}->disconnect };
+       $err .= "over disconnect: $@\n" if $@;
        delete $self->{bnote};
        my $nbytes = $self->{total_bytes};
        $self->{total_bytes} = 0;
        $self->lock_release(!!$nbytes) if $shards;
        $self->{ibx}->git->cleanup;
+       die $err if $err;
 }
 
 sub fill_alternates ($$) {
index 7547f6e4761f20008486d2642fd458a186917885..fad708d8f9fb6be344b3d1001b38aeb328752396 100644 (file)
@@ -124,8 +124,10 @@ sub new {
 sub _done_for_now {
        my ($self) = @_;
        local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
-       for (values %{$self->{importers}}) {
-               $_->done if $_; # $_ may be undef during cleanup
+       for my $im (values %{$self->{importers}}) {
+               next if !$im; # $im may be undef during cleanup
+               eval { $im->done };
+               warn "$im->{ibx}->{name} ->done: $@\n" if $@;
        }
 }
 
@@ -137,12 +139,15 @@ sub remove_eml_i { # each_inbox callback
                $im->remove($eml, 'spam');
                if (my $scrub = $ibx->filter($im)) {
                        my $scrubbed = $scrub->scrub($eml, 1);
-                       $scrubbed or return;
-                       $scrubbed == REJECT() and return;
-                       $im->remove($scrubbed, 'spam');
+                       if ($scrubbed && $scrubbed != REJECT) {
+                               $im->remove($scrubbed, 'spam');
+                       }
                }
        };
-       warn "error removing spam at: $loc from $ibx->{name}: $@\n" if $@;
+       if ($@) {
+               warn "error removing spam at: $loc from $ibx->{name}: $@\n";
+               _done_for_now($self);
+       }
 }
 
 sub _remove_spam {
@@ -155,7 +160,6 @@ sub _remove_spam {
 
 sub import_eml ($$$) {
        my ($self, $ibx, $eml) = @_;
-       my $im = _importer_for($self, $ibx);
 
        # any header match means it's eligible for the inbox:
        if (my $watch_hdrs = $ibx->{-watchheaders}) {
@@ -167,13 +171,19 @@ sub import_eml ($$$) {
                }
                return unless $ok;
        }
-
-       if (my $scrub = $ibx->filter($im)) {
-               my $ret = $scrub->scrub($eml) or return;
-               $ret == REJECT() and return;
-               $eml = $ret;
+       eval {
+               my $im = _importer_for($self, $ibx);
+               if (my $scrub = $ibx->filter($im)) {
+                       my $scrubbed = $scrub->scrub($eml) or return;
+                       $scrubbed == REJECT and return;
+                       $eml = $scrubbed;
+               }
+               $im->add($eml, $self->{spamcheck});
+       };
+       if ($@) {
+               warn "$ibx->{name} add failed: $@\n";
+               _done_for_now($self);
        }
-       $im->add($eml, $self->{spamcheck});
 }
 
 sub _try_path {
index 64f8b1ac5c8378f6ce88a783f8bdcc86369efbd6..2d12ba6a2c6a886e7db9019024a2dd13a47c26d5 100644 (file)
@@ -14,6 +14,7 @@ my @mods = qw(DBD::SQLite Search::Xapian HTTP::Request::Common Plack::Test
 require_mods(@mods);
 use_ok($_) for (qw(HTTP::Request::Common Plack::Test));
 use_ok 'PublicInbox::WWW';
+use_ok 'PublicInbox::SearchIdx';
 my ($tmpdir, $for_destroy) = tmpdir();
 
 my $ibx = PublicInbox::Inbox->new({