use PublicInbox::Search;
use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor);
use PublicInbox::OverIdx;
+use PublicInbox::MiscIdx;
use PublicInbox::MID qw(mids);
use PublicInbox::V2Writable;
use PublicInbox::InboxWritable;
# `d' message was already unindexed in the v1/v2 inboxes,
# so it's too noisy to warn, here.
}
+ # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
+ if (defined(my $cur_cmt = $req->{cur_cmt})) {
+ ${$req->{latest_cmt}} = $cur_cmt;
+ }
}
sub do_step ($) { # main iterator for adding messages to the index
sub index_oid { # git->cat_async callback for 'm'
my ($bref, $oid, $type, $size, $req) = @_;
+ my $self = $req->{self};
+ local $self->{current_info} = "$self->{current_info} $oid";
return if is_bad_blob($oid, $type, $size, $req->{oid});
my $new_smsg = $req->{new_smsg} = bless {
blob => $oid,
}, 'PublicInbox::Smsg';
$new_smsg->{bytes} = $size + crlf_adjust($$bref);
defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return;
+ ++${$req->{nr}};
do_step($req);
}
sub unindex_oid { # git->cat_async callback for 'd'
my ($bref, $oid, $type, $size, $req) = @_;
+ my $self = $req->{self};
+ local $self->{current_info} = "$self->{current_info} $oid";
return if is_bad_blob($oid, $type, $size, $req->{oid});
return if defined(cur_ibx_xnum($req, $bref)); # was re-added
do_step($req);
}
sub _sync_inbox ($$$) {
- my ($self, $opt, $ibx) = @_;
- my $sync = {
- need_checkpoint => \(my $bool = 0),
- reindex => $opt->{reindex},
- -opt => $opt,
- self => $self,
- ibx => $ibx,
- };
+ my ($self, $sync, $ibx) = @_;
+ $sync->{ibx} = $ibx;
+ $sync->{nr} = \(my $nr = 0);
my $v = $ibx->version;
my $ekey = $ibx->eidx_key;
if ($v == 2) {
my $epoch_max;
defined($ibx->git_dir_latest(\$epoch_max)) or return;
$sync->{epoch_max} = $epoch_max;
- sync_prepare($self, $sync) or return; # fills $sync->{todo}
+ sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
} elsif ($v == 1) {
my $uv = $ibx->uidvalidity;
my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv");
warn "E: $ekey unsupported inbox version (v$v)\n";
return;
}
- index_todo($self, $sync, $_) for @{$sync->{todo}};
+ unless ($sync->{quit}) {
+ index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
+ $self->{midx}->index_ibx($ibx) unless $sync->{quit};
+ }
+ $ibx->git->cleanup; # done with this inbox, now
}
sub eidx_sync { # main entry point
local $SIG{__WARN__} = sub {
$warn_cb->($self->{current_info}, ': ', @_);
};
+ my $sync = {
+ need_checkpoint => \(my $need_checkpoint = 0),
+ reindex => $opt->{reindex},
+ -opt => $opt,
+ self => $self,
+ -regen_fmt => "%u/?\n",
+ };
+ local $SIG{USR1} = sub { $need_checkpoint = 1 };
+ my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; };
+ local $SIG{QUIT} = $quit;
+ local $SIG{INT} = $quit;
+ local $SIG{TERM} = $quit;
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
- _sync_inbox($self, $opt, $ibx);
+ _sync_inbox($self, $sync, $ibx);
+ last if $sync->{quit};
}
- $self->{oidx}->rethread_done($opt);
+ $self->{oidx}->rethread_done($opt) unless $sync->{quit};
PublicInbox::V2Writable::done($self);
}
sub update_last_commit { # overrides V2Writable
- my ($self, $sync, $unit, $latest_cmt) = @_;
- return unless defined $latest_cmt;
-
- $self->git->async_wait_all;
+ my ($self, $sync, $stk) = @_;
+ my $unit = $sync->{unit} // return;
+ my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ defined($latest_cmt) or return;
my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
my $ekey = $ibx->eidx_key;
my $uv = $ibx->uidvalidity;
die "Unsupported inbox version: $v";
}
my $last = $self->{oidx}->eidx_meta($meta_key);
- if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+ if (defined $last && is_ancestor($self->git, $last, $latest_cmt)) {
my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
chomp(my $n = $unit->{git}->qx(@cmd));
return if $n ne '' && $n == 0;
$self->{oidx}->eidx_meta($meta_key, $latest_cmt);
}
+sub _idx_init { # with_umask callback
+ my ($self, $opt) = @_;
+ PublicInbox::V2Writable::_idx_init($self, $opt);
+ $self->{midx} = PublicInbox::MiscIdx->new($self);
+}
+
sub idx_init { # similar to V2Writable
my ($self, $opt) = @_;
return if $self->{idx_shards};
my $info_dir = "$ALL/objects/info";
my $alt = "$info_dir/alternates";
my $mode = 0644;
- my (%old, @old, %new, @new);
+ my (@old, @new, %seen); # seen: st_dev + st_ino
if (-e $alt) {
open(my $fh, '<', $alt) or die "open $alt: $!";
$mode = (stat($fh))[2] & 07777;
- while (<$fh>) {
- push @old, $_ if !$old{$_}++;
+ while (my $line = <$fh>) {
+ chomp(my $d = $line);
+ if (my @st = stat($d)) {
+ next if $seen{"$st[0]\0$st[1]"}++;
+ } else {
+ warn "W: stat($d) failed (from $alt): $!\n";
+ }
+ push @old, $line;
}
}
for my $ibx (@{$self->{ibx_list}}) {
my $line = $ibx->git->{git_dir} . "/objects\n";
- next if $old{$line};
- $new{$line} = 1;
+ chomp(my $d = $line);
+ if (my @st = stat($d)) {
+ next if $seen{"$st[0]\0$st[1]"}++;
+ } else {
+ warn "W: stat($d) failed (from $ibx->{inboxdir}): $!\n";
+ }
push @new, $line;
}
if (scalar @new) {
}
$self->parallel_init($self->{indexlevel});
$self->umask_prepare;
- $self->with_umask(\&PublicInbox::V2Writable::_idx_init, $self, $opt);
+ $self->with_umask(\&_idx_init, $self, $opt);
$self->{oidx}->begin_lazy;
$self->{oidx}->eidx_prep;
+ $self->{midx}->begin_txn;
}
no warnings 'once';