sub event_step {
my ($self) = @_;
my $cb = $self->{cb};
+ local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
eval {
my @events = $self->{inot}->read; # Linux::Inotify2->read
$cb->($_) for @events;
return ($self->{in}, $self->{out}) if $self->{pid};
- my ($out_r, $out_w);
+ my (@ret, $out_r, $out_w);
pipe($out_r, $out_w) or die "pipe failed: $!";
- my $git = $self->{git};
$self->lock_acquire;
-
- local $/ = "\n";
- my $ref = $self->{ref};
- chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
- if ($self->{path_type} ne '2/38' && $self->{tip}) {
- local $/ = "\0";
- my @tree = $git->qx(qw(ls-tree -r -z --name-only), $ref);
- chomp @tree;
- $self->{-tree} = { map { $_ => 1 } @tree };
+ eval {
+ my ($git, $ref) = @$self{qw(git ref)};
+ local $/ = "\n";
+ chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
+ if ($self->{path_type} ne '2/38' && $self->{tip}) {
+ local $/ = "\0";
+ my @t = $git->qx(qw(ls-tree -r -z --name-only), $ref);
+ chomp @t;
+ $self->{-tree} = { map { $_ => 1 } @t };
+ }
+ my @cmd = ('git', "--git-dir=$git->{git_dir}",
+ qw(fast-import --quiet --done --date-format=raw));
+ my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
+ $out_w->autoflush(1);
+ $self->{in} = $in_r;
+ $self->{out} = $out_w;
+ $self->{pid} = $pid;
+ $self->{nchg} = 0;
+ @ret = ($in_r, $out_w);
+ };
+ if ($@) {
+ $self->lock_release;
+ die $@;
}
-
- my $git_dir = $git->{git_dir};
- my @cmd = ('git', "--git-dir=$git_dir", qw(fast-import
- --quiet --done --date-format=raw));
- my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
- $out_w->autoflush(1);
- $self->{in} = $in_r;
- $self->{out} = $out_w;
- $self->{pid} = $pid;
- $self->{nchg} = 0;
- ($in_r, $out_w);
+ @ret;
}
sub wfail () { die "write to fast-import failed: $!" }
my $env = { GIT_INDEX_FILE => $index };
run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env);
}
- run_die([@cmd, 'update-server-info']);
+ eval { run_die([@cmd, 'update-server-info']) };
my $ibx = $self->{ibx};
- ($ibx && $self->{path_type} eq '2/38') and eval {
- require PublicInbox::SearchIdx;
- my $s = PublicInbox::SearchIdx->new($ibx);
- $s->index_sync({ ref => $self->{ref} });
- };
+ if ($ibx && $ibx->version == 1 && -d "$ibx->{inboxdir}/public-inbox" &&
+ eval { require PublicInbox::SearchIdx }) {
+ eval {
+ my $s = PublicInbox::SearchIdx->new($ibx);
+ $s->index_sync({ ref => $self->{ref} });
+ };
+ warn "$ibx->{inboxdir} index failed: $@\n" if $@;
+ }
eval { run_die([@cmd, qw(gc --auto)]) } if $do_gc;
}
sub done {
my ($self) = @_;
my $w = delete $self->{out} or return;
- my $r = delete $self->{in} or die 'BUG: missing {in} when done';
- print $w "done\n" or wfail;
- my $pid = delete $self->{pid} or die 'BUG: missing {pid} when done';
- waitpid($pid, 0) == $pid or die 'fast-import did not finish';
- $? == 0 or die "fast-import failed: $?";
-
+ eval {
+ my $r = delete $self->{in} or die 'BUG: missing {in} when done';
+ print $w "done\n" or wfail;
+ my $pid = delete $self->{pid} or
+ die 'BUG: missing {pid} when done';
+ waitpid($pid, 0) == $pid or die 'fast-import did not finish';
+ $? == 0 or die "fast-import failed: $?";
+ };
+ my $wait_err = $@;
my $nchg = delete $self->{nchg};
- _update_git_info($self, 1) if $nchg;
+ if ($nchg && !$wait_err) {
+ eval { _update_git_info($self, 1) };
+ warn "E: $self->{git}->{git_dir} update info: $@\n" if $@;
+ }
$self->lock_release(!!$nchg);
-
$self->{git}->cleanup;
+ die $wait_err if $wait_err;
}
sub atfork_child {
croak 'already locked '.($lock_path // '(undef)') if $self->{lockfh};
return unless defined($lock_path);
sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
- die "failed to open lock $lock_path: $!\n";
- flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
+ croak "failed to open $lock_path: $!\n";
+ flock($lockfh, LOCK_EX) or croak "lock $lock_path failed: $!\n";
$self->{lockfh} = $lockfh;
}
sub lock_release {
my ($self, $wake) = @_;
- return unless $self->{lock_path};
- my $lockfh = delete $self->{lockfh} or croak 'not locked';
+ defined(my $lock_path = $self->{lock_path}) or return;
+ my $lockfh = delete $self->{lockfh} or croak "not locked: $lock_path";
syswrite($lockfh, '.') if $wake;
- flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
- close $lockfh or die "close failed: $!\n";
+ flock($lockfh, LOCK_UN) or croak "unlock $lock_path failed: $!\n";
+ close $lockfh or croak "close $lock_path failed: $!\n";
}
1;
}
my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd);
- die "fork_exec failed: $!\n" unless $pid > 0;
+ die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
$pid;
}
# public
sub done {
my ($self) = @_;
- my $im = delete $self->{im};
- $im->done if $im; # PublicInbox::Import::done
- checkpoint($self);
- my $mm = delete $self->{mm};
- $mm->{dbh}->commit if $mm;
+ my $err = '';
+ if (my $im = delete $self->{im}) {
+ eval { $im->done }; # PublicInbox::Import::done
+ $err .= "import done: $@\n" if $@;
+ }
+ if (!$err) {
+ eval { checkpoint($self) };
+ $err .= "checkpoint: $@\n" if $@;
+ }
+ if (my $mm = delete $self->{mm}) {
+ my $m = $err ? 'rollback' : 'commit';
+ eval { $mm->{dbh}->$m };
+ $err .= "msgmap $m: $@\n" if $@;
+ }
my $shards = delete $self->{idx_shards};
if ($shards) {
- $_->remote_close for @$shards;
+ for (@$shards) {
+ eval { $_->remote_close };
+ $err .= "shard close: $@\n" if $@;
+ }
}
- $self->{over}->disconnect;
+ eval { $self->{over}->disconnect };
+ $err .= "over disconnect: $@\n" if $@;
delete $self->{bnote};
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
$self->lock_release(!!$nbytes) if $shards;
$self->{ibx}->git->cleanup;
+ die $err if $err;
}
sub fill_alternates ($$) {
sub _done_for_now {
my ($self) = @_;
local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
- for (values %{$self->{importers}}) {
- $_->done if $_; # $_ may be undef during cleanup
+ for my $im (values %{$self->{importers}}) {
+ next if !$im; # $im may be undef during cleanup
+ eval { $im->done };
+ warn "$im->{ibx}->{name} ->done: $@\n" if $@;
}
}
$im->remove($eml, 'spam');
if (my $scrub = $ibx->filter($im)) {
my $scrubbed = $scrub->scrub($eml, 1);
- $scrubbed or return;
- $scrubbed == REJECT() and return;
- $im->remove($scrubbed, 'spam');
+ if ($scrubbed && $scrubbed != REJECT) {
+ $im->remove($scrubbed, 'spam');
+ }
}
};
- warn "error removing spam at: $loc from $ibx->{name}: $@\n" if $@;
+ if ($@) {
+ warn "error removing spam at: $loc from $ibx->{name}: $@\n";
+ _done_for_now($self);
+ }
}
sub _remove_spam {
sub import_eml ($$$) {
my ($self, $ibx, $eml) = @_;
- my $im = _importer_for($self, $ibx);
# any header match means it's eligible for the inbox:
if (my $watch_hdrs = $ibx->{-watchheaders}) {
}
return unless $ok;
}
-
- if (my $scrub = $ibx->filter($im)) {
- my $ret = $scrub->scrub($eml) or return;
- $ret == REJECT() and return;
- $eml = $ret;
+ eval {
+ my $im = _importer_for($self, $ibx);
+ if (my $scrub = $ibx->filter($im)) {
+ my $scrubbed = $scrub->scrub($eml) or return;
+ $scrubbed == REJECT and return;
+ $eml = $scrubbed;
+ }
+ $im->add($eml, $self->{spamcheck});
+ };
+ if ($@) {
+ warn "$ibx->{name} add failed: $@\n";
+ _done_for_now($self);
}
- $im->add($eml, $self->{spamcheck});
}
sub _try_path {
require_mods(@mods);
use_ok($_) for (qw(HTTP::Request::Common Plack::Test));
use_ok 'PublicInbox::WWW';
+use_ok 'PublicInbox::SearchIdx';
my ($tmpdir, $for_destroy) = tmpdir();
my $ibx = PublicInbox::Inbox->new({