use PublicInbox::Search;
use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor);
use PublicInbox::OverIdx;
+use PublicInbox::MiscIdx;
use PublicInbox::MID qw(mids);
use PublicInbox::V2Writable;
use PublicInbox::InboxWritable;
sub new {
my (undef, $dir, $opt) = @_;
+ $dir = File::Spec->canonpath($dir);
my $l = $opt->{indexlevel} // 'full';
$l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
die "invalid indexlevel=$l\n";
$size == 0 ? 1 : 0; # size == 0 means purged
}
+sub check_batch_limit ($) {
+ my ($req) = @_;
+ my $self = $req->{self};
+ my $new_smsg = $req->{new_smsg};
+
+ # {raw_bytes} may be unset, so just use {bytes}
+ my $n = $self->{transact_bytes} += $new_smsg->{bytes};
+
+ # set flag for PublicInbox::V2Writable::index_todo:
+ ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes};
+}
+
sub do_xpost ($$) {
my ($req, $smsg) = @_;
my $self = $req->{self};
my $xnum = $req->{xnum};
$self->{oidx}->add_xref3($docid, $xnum, $oid, $xibx->eidx_key);
$idx->shard_add_eidx_info($docid, $oid, $xibx, $eml);
+ check_batch_limit($req);
} else { # 'd'
$self->{oidx}->remove_xref3($docid, $oid, $xibx->eidx_key);
$idx->shard_remove_eidx_info($docid, $oid, $xibx, $eml);
my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
$idx->index_raw(undef, $eml, $new_smsg, $ibx);
+ check_batch_limit($req);
}
sub do_finalize ($) {
} elsif (exists $req->{new_smsg}) { # totally unseen messsage
index_unseen($req);
} else {
- warn "W: ignoring delete $req->{oid} (not found)\n";
+ # `d' message was already unindexed in the v1/v2 inboxes,
+ # so it's too noisy to warn, here.
+ }
+ # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
+ if (defined(my $cur_cmt = $req->{cur_cmt})) {
+ ${$req->{latest_cmt}} = $cur_cmt;
}
}
sub index_oid { # git->cat_async callback for 'm'
my ($bref, $oid, $type, $size, $req) = @_;
+ my $self = $req->{self};
+ local $self->{current_info} = "$self->{current_info} $oid";
return if is_bad_blob($oid, $type, $size, $req->{oid});
my $new_smsg = $req->{new_smsg} = bless {
blob => $oid,
}, 'PublicInbox::Smsg';
$new_smsg->{bytes} = $size + crlf_adjust($$bref);
defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return;
+ ++${$req->{nr}};
do_step($req);
}
sub unindex_oid { # git->cat_async callback for 'd'
my ($bref, $oid, $type, $size, $req) = @_;
+ my $self = $req->{self};
+ local $self->{current_info} = "$self->{current_info} $oid";
return if is_bad_blob($oid, $type, $size, $req->{oid});
return if defined(cur_ibx_xnum($req, $bref)); # was re-added
do_step($req);
}
sub _sync_inbox ($$$) {
- my ($self, $opt, $ibx) = @_;
- my $sync = {
- need_checkpoint => \(my $bool = 0),
- reindex => $opt->{reindex},
- -opt => $opt,
- self => $self,
- ibx => $ibx,
- };
+ my ($self, $sync, $ibx) = @_;
+ $sync->{ibx} = $ibx;
+ $sync->{nr} = \(my $nr = 0);
my $v = $ibx->version;
my $ekey = $ibx->eidx_key;
if ($v == 2) {
my $epoch_max;
defined($ibx->git_dir_latest(\$epoch_max)) or return;
$sync->{epoch_max} = $epoch_max;
- sync_prepare($self, $sync) or return; # fills $sync->{todo}
+ sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
} elsif ($v == 1) {
my $uv = $ibx->uidvalidity;
my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv");
warn "E: $ekey unsupported inbox version (v$v)\n";
return;
}
- index_todo($self, $sync, $_) for @{$sync->{todo}};
+ unless ($sync->{quit}) {
+ index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
+ $self->{midx}->index_ibx($ibx) unless $sync->{quit};
+ }
+ $ibx->git->cleanup; # done with this inbox, now
}
sub eidx_sync { # main entry point
local $SIG{__WARN__} = sub {
$warn_cb->($self->{current_info}, ': ', @_);
};
- _sync_inbox($self, $opt, $_) for (@{$self->{ibx_list}});
+ my $sync = {
+ need_checkpoint => \(my $need_checkpoint = 0),
+ reindex => $opt->{reindex},
+ -opt => $opt,
+ self => $self,
+ -regen_fmt => "%u/?\n",
+ };
+ local $SIG{USR1} = sub { $need_checkpoint = 1 };
+ my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; };
+ local $SIG{QUIT} = $quit;
+ local $SIG{INT} = $quit;
+ local $SIG{TERM} = $quit;
+
+ # don't use $_ here, it'll get clobbered by reindex_checkpoint
+ for my $ibx (@{$self->{ibx_list}}) {
+ _sync_inbox($self, $sync, $ibx);
+ last if $sync->{quit};
+ }
- $self->{oidx}->rethread_done($opt);
+ $self->{oidx}->rethread_done($opt) unless $sync->{quit};
PublicInbox::V2Writable::done($self);
}
sub update_last_commit { # overrides V2Writable
- my ($self, $sync, $unit, $latest_cmt) = @_;
- return unless defined $latest_cmt;
-
- $self->git->async_wait_all;
+ my ($self, $sync, $stk) = @_;
+ my $unit = $sync->{unit} // return;
+ my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ defined($latest_cmt) or return;
my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
my $ekey = $ibx->eidx_key;
my $uv = $ibx->uidvalidity;
die "Unsupported inbox version: $v";
}
my $last = $self->{oidx}->eidx_meta($meta_key);
- if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+ if (defined $last && is_ancestor($self->git, $last, $latest_cmt)) {
my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
chomp(my $n = $unit->{git}->qx(@cmd));
return if $n ne '' && $n == 0;
$self->{oidx}->eidx_meta($meta_key, $latest_cmt);
}
+sub _idx_init { # with_umask callback
+ my ($self, $opt) = @_;
+ PublicInbox::V2Writable::_idx_init($self, $opt);
+ $self->{midx} = PublicInbox::MiscIdx->new($self);
+}
+
sub idx_init { # similar to V2Writable
my ($self, $opt) = @_;
return if $self->{idx_shards};
my $info_dir = "$ALL/objects/info";
my $alt = "$info_dir/alternates";
my $mode = 0644;
- my (%old, @old, %new, @new);
+ my (@old, @new, %seen); # seen: st_dev + st_ino
if (-e $alt) {
open(my $fh, '<', $alt) or die "open $alt: $!";
$mode = (stat($fh))[2] & 07777;
- while (<$fh>) {
- push @old, $_ if !$old{$_}++;
+ while (my $line = <$fh>) {
+ chomp(my $d = $line);
+ if (my @st = stat($d)) {
+ next if $seen{"$st[0]\0$st[1]"}++;
+ } else {
+ warn "W: stat($d) failed (from $alt): $!\n";
+ }
+ push @old, $line;
}
}
for my $ibx (@{$self->{ibx_list}}) {
my $line = $ibx->git->{git_dir} . "/objects\n";
- next if $old{$line};
- $new{$line} = 1;
+ chomp(my $d = $line);
+ if (my @st = stat($d)) {
+ next if $seen{"$st[0]\0$st[1]"}++;
+ } else {
+ warn "W: stat($d) failed (from $ibx->{inboxdir}): $!\n";
+ }
push @new, $line;
}
- push @old, @new;
- PublicInbox::V2Writable::write_alternates($info_dir, $mode, \@old);
+ if (scalar @new) {
+ push @old, @new;
+ my $o = \@old;
+ PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o);
+ }
$self->parallel_init($self->{indexlevel});
$self->umask_prepare;
- $self->with_umask(\&PublicInbox::V2Writable::_idx_init, $self, $opt);
+ $self->with_umask(\&_idx_init, $self, $opt);
$self->{oidx}->begin_lazy;
$self->{oidx}->eidx_prep;
+ $self->{midx}->begin_txn;
}
no warnings 'once';