v?fork failures seems to be the cause of locks not getting
released in -watch. Ensure lock release doesn't get skipped
in ->done for both v1 and v2 inboxes. We also need to do
everything we can to ensure DB handles, pipes and processes
get released even in the face of failure.
While we're at it, make failures around `git update-server-info'
non-fatal, since smart HTTP seems more popular anyways.
v2 changes:
- spawn: show failing command
- ensure waitpid is synchronous for inotify events
- teardown all fast-import processes on exception,
not just the failing one
- beef up lock_release error handling
- release lock on fast-import spawn failure
sub event_step {
my ($self) = @_;
my $cb = $self->{cb};
sub event_step {
my ($self) = @_;
my $cb = $self->{cb};
+ local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
eval {
my @events = $self->{inot}->read; # Linux::Inotify2->read
$cb->($_) for @events;
eval {
my @events = $self->{inot}->read; # Linux::Inotify2->read
$cb->($_) for @events;
return ($self->{in}, $self->{out}) if $self->{pid};
return ($self->{in}, $self->{out}) if $self->{pid};
+ my (@ret, $out_r, $out_w);
pipe($out_r, $out_w) or die "pipe failed: $!";
pipe($out_r, $out_w) or die "pipe failed: $!";
- my $git = $self->{git};
-
- local $/ = "\n";
- my $ref = $self->{ref};
- chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
- if ($self->{path_type} ne '2/38' && $self->{tip}) {
- local $/ = "\0";
- my @tree = $git->qx(qw(ls-tree -r -z --name-only), $ref);
- chomp @tree;
- $self->{-tree} = { map { $_ => 1 } @tree };
+ eval {
+ my ($git, $ref) = @$self{qw(git ref)};
+ local $/ = "\n";
+ chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
+ if ($self->{path_type} ne '2/38' && $self->{tip}) {
+ local $/ = "\0";
+ my @t = $git->qx(qw(ls-tree -r -z --name-only), $ref);
+ chomp @t;
+ $self->{-tree} = { map { $_ => 1 } @t };
+ }
+ my @cmd = ('git', "--git-dir=$git->{git_dir}",
+ qw(fast-import --quiet --done --date-format=raw));
+ my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
+ $out_w->autoflush(1);
+ $self->{in} = $in_r;
+ $self->{out} = $out_w;
+ $self->{pid} = $pid;
+ $self->{nchg} = 0;
+ @ret = ($in_r, $out_w);
+ };
+ if ($@) {
+ $self->lock_release;
+ die $@;
-
- my $git_dir = $git->{git_dir};
- my @cmd = ('git', "--git-dir=$git_dir", qw(fast-import
- --quiet --done --date-format=raw));
- my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
- $out_w->autoflush(1);
- $self->{in} = $in_r;
- $self->{out} = $out_w;
- $self->{pid} = $pid;
- $self->{nchg} = 0;
- ($in_r, $out_w);
}
sub wfail () { die "write to fast-import failed: $!" }
}
sub wfail () { die "write to fast-import failed: $!" }
my $env = { GIT_INDEX_FILE => $index };
run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env);
}
my $env = { GIT_INDEX_FILE => $index };
run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env);
}
- run_die([@cmd, 'update-server-info']);
+ eval { run_die([@cmd, 'update-server-info']) };
- ($ibx && $self->{path_type} eq '2/38') and eval {
- require PublicInbox::SearchIdx;
- my $s = PublicInbox::SearchIdx->new($ibx);
- $s->index_sync({ ref => $self->{ref} });
- };
+ if ($ibx && $ibx->version == 1 && -d "$ibx->{inboxdir}/public-inbox" &&
+ eval { require PublicInbox::SearchIdx }) {
+ eval {
+ my $s = PublicInbox::SearchIdx->new($ibx);
+ $s->index_sync({ ref => $self->{ref} });
+ };
+ warn "$ibx->{inboxdir} index failed: $@\n" if $@;
+ }
eval { run_die([@cmd, qw(gc --auto)]) } if $do_gc;
}
eval { run_die([@cmd, qw(gc --auto)]) } if $do_gc;
}
sub done {
my ($self) = @_;
my $w = delete $self->{out} or return;
sub done {
my ($self) = @_;
my $w = delete $self->{out} or return;
- my $r = delete $self->{in} or die 'BUG: missing {in} when done';
- print $w "done\n" or wfail;
- my $pid = delete $self->{pid} or die 'BUG: missing {pid} when done';
- waitpid($pid, 0) == $pid or die 'fast-import did not finish';
- $? == 0 or die "fast-import failed: $?";
-
+ eval {
+ my $r = delete $self->{in} or die 'BUG: missing {in} when done';
+ print $w "done\n" or wfail;
+ my $pid = delete $self->{pid} or
+ die 'BUG: missing {pid} when done';
+ waitpid($pid, 0) == $pid or die 'fast-import did not finish';
+ $? == 0 or die "fast-import failed: $?";
+ };
+ my $wait_err = $@;
my $nchg = delete $self->{nchg};
my $nchg = delete $self->{nchg};
- _update_git_info($self, 1) if $nchg;
+ if ($nchg && !$wait_err) {
+ eval { _update_git_info($self, 1) };
+ warn "E: $self->{git}->{git_dir} update info: $@\n" if $@;
+ }
$self->lock_release(!!$nchg);
$self->lock_release(!!$nchg);
+ die $wait_err if $wait_err;
croak 'already locked '.($lock_path // '(undef)') if $self->{lockfh};
return unless defined($lock_path);
sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
croak 'already locked '.($lock_path // '(undef)') if $self->{lockfh};
return unless defined($lock_path);
sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
- die "failed to open lock $lock_path: $!\n";
- flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
+ croak "failed to open $lock_path: $!\n";
+ flock($lockfh, LOCK_EX) or croak "lock $lock_path failed: $!\n";
$self->{lockfh} = $lockfh;
}
sub lock_release {
my ($self, $wake) = @_;
$self->{lockfh} = $lockfh;
}
sub lock_release {
my ($self, $wake) = @_;
- return unless $self->{lock_path};
- my $lockfh = delete $self->{lockfh} or croak 'not locked';
+ defined(my $lock_path = $self->{lock_path}) or return;
+ my $lockfh = delete $self->{lockfh} or croak "not locked: $lock_path";
syswrite($lockfh, '.') if $wake;
syswrite($lockfh, '.') if $wake;
- flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
- close $lockfh or die "close failed: $!\n";
+ flock($lockfh, LOCK_UN) or croak "unlock $lock_path failed: $!\n";
+ close $lockfh or croak "close $lock_path failed: $!\n";
}
my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd);
}
my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd);
- die "fork_exec failed: $!\n" unless $pid > 0;
+ die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
# public
sub done {
my ($self) = @_;
# public
sub done {
my ($self) = @_;
- my $im = delete $self->{im};
- $im->done if $im; # PublicInbox::Import::done
- checkpoint($self);
- my $mm = delete $self->{mm};
- $mm->{dbh}->commit if $mm;
+ my $err = '';
+ if (my $im = delete $self->{im}) {
+ eval { $im->done }; # PublicInbox::Import::done
+ $err .= "import done: $@\n" if $@;
+ }
+ if (!$err) {
+ eval { checkpoint($self) };
+ $err .= "checkpoint: $@\n" if $@;
+ }
+ if (my $mm = delete $self->{mm}) {
+ my $m = $err ? 'rollback' : 'commit';
+ eval { $mm->{dbh}->$m };
+ $err .= "msgmap $m: $@\n" if $@;
+ }
my $shards = delete $self->{idx_shards};
if ($shards) {
my $shards = delete $self->{idx_shards};
if ($shards) {
- $_->remote_close for @$shards;
+ for (@$shards) {
+ eval { $_->remote_close };
+ $err .= "shard close: $@\n" if $@;
+ }
- $self->{over}->disconnect;
+ eval { $self->{over}->disconnect };
+ $err .= "over disconnect: $@\n" if $@;
delete $self->{bnote};
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
$self->lock_release(!!$nbytes) if $shards;
$self->{ibx}->git->cleanup;
delete $self->{bnote};
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
$self->lock_release(!!$nbytes) if $shards;
$self->{ibx}->git->cleanup;
}
sub fill_alternates ($$) {
}
sub fill_alternates ($$) {
sub _done_for_now {
my ($self) = @_;
local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
sub _done_for_now {
my ($self) = @_;
local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
- for (values %{$self->{importers}}) {
- $_->done if $_; # $_ may be undef during cleanup
+ for my $im (values %{$self->{importers}}) {
+ next if !$im; # $im may be undef during cleanup
+ eval { $im->done };
+ warn "$im->{ibx}->{name} ->done: $@\n" if $@;
$im->remove($eml, 'spam');
if (my $scrub = $ibx->filter($im)) {
my $scrubbed = $scrub->scrub($eml, 1);
$im->remove($eml, 'spam');
if (my $scrub = $ibx->filter($im)) {
my $scrubbed = $scrub->scrub($eml, 1);
- $scrubbed or return;
- $scrubbed == REJECT() and return;
- $im->remove($scrubbed, 'spam');
+ if ($scrubbed && $scrubbed != REJECT) {
+ $im->remove($scrubbed, 'spam');
+ }
- warn "error removing spam at: $loc from $ibx->{name}: $@\n" if $@;
+ if ($@) {
+ warn "error removing spam at: $loc from $ibx->{name}: $@\n";
+ _done_for_now($self);
+ }
sub import_eml ($$$) {
my ($self, $ibx, $eml) = @_;
sub import_eml ($$$) {
my ($self, $ibx, $eml) = @_;
- my $im = _importer_for($self, $ibx);
# any header match means it's eligible for the inbox:
if (my $watch_hdrs = $ibx->{-watchheaders}) {
# any header match means it's eligible for the inbox:
if (my $watch_hdrs = $ibx->{-watchheaders}) {
-
- if (my $scrub = $ibx->filter($im)) {
- my $ret = $scrub->scrub($eml) or return;
- $ret == REJECT() and return;
- $eml = $ret;
+ eval {
+ my $im = _importer_for($self, $ibx);
+ if (my $scrub = $ibx->filter($im)) {
+ my $scrubbed = $scrub->scrub($eml) or return;
+ $scrubbed == REJECT and return;
+ $eml = $scrubbed;
+ }
+ $im->add($eml, $self->{spamcheck});
+ };
+ if ($@) {
+ warn "$ibx->{name} add failed: $@\n";
+ _done_for_now($self);
- $im->add($eml, $self->{spamcheck});
require_mods(@mods);
use_ok($_) for (qw(HTTP::Request::Common Plack::Test));
use_ok 'PublicInbox::WWW';
require_mods(@mods);
use_ok($_) for (qw(HTTP::Request::Common Plack::Test));
use_ok 'PublicInbox::WWW';
+use_ok 'PublicInbox::SearchIdx';
my ($tmpdir, $for_destroy) = tmpdir();
my $ibx = PublicInbox::Inbox->new({
my ($tmpdir, $for_destroy) = tmpdir();
my $ibx = PublicInbox::Inbox->new({