use v5.10.1;
use parent qw(PublicInbox::ExtSearch PublicInbox::Lock);
use Carp qw(croak carp);
+use Sys::Hostname qw(hostname);
+use POSIX qw(strftime);
use PublicInbox::Search;
use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor
is_bad_blob);
use PublicInbox::InboxWritable;
use PublicInbox::ContentHash qw(content_hash);
use PublicInbox::Eml;
-use File::Spec;
+use PublicInbox::DS qw(now);
use DBI qw(:sql_types); # SQL_BLOB
sub new {
my (undef, $dir, $opt) = @_;
- $dir = File::Spec->canonpath($dir);
my $l = $opt->{indexlevel} // 'full';
$l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
die "invalid indexlevel=$l\n";
sub attach_inbox {
my ($self, $ibx) = @_;
- $ibx = PublicInbox::InboxWritable->new($ibx);
my $key = $ibx->eidx_key;
if (!$ibx->over || !$ibx->mm) {
warn "W: skipping $key (unindexed)\n";
warn "W: skipping $key (no UIDVALIDITY)\n";
return;
}
- my $ibxdir = File::Spec->canonpath($ibx->{inboxdir});
- if ($ibxdir ne $ibx->{inboxdir}) {
- warn "W: `$ibx->{inboxdir}' canonicalized to `$ibxdir'\n";
- $ibx->{inboxdir} = $ibxdir;
- }
- $ibx = PublicInbox::InboxWritable->new($ibx);
$self->{ibx_map}->{$key} //= do {
push @{$self->{ibx_list}}, $ibx;
$ibx;
my $v = $ibx->version;
my $ekey = $ibx->eidx_key;
if ($v == 2) {
- my $epoch_max;
- defined($ibx->git_dir_latest(\$epoch_max)) or return;
- $sync->{epoch_max} = $epoch_max;
+ $sync->{epoch_max} = $ibx->max_git_epoch // return;
sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
} elsif ($v == 1) {
my $uv = $ibx->uidvalidity;
$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
}
+sub checkpoint_due ($) {
+ my ($sync) = @_;
+ ${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
+}
+
+sub host_ident () {
+ # I've copied FS images and only changed the hostname before,
+ # so prepend hostname. Use `state' since these a BOFH can change
+ # these while this process is running and we always want to be
+ # able to release locks taken by this process.
+ state $retval = hostname . '-' . do {
+ my $m; # machine-id(5) is systemd
+ if (open(my $fh, '<', '/etc/machine-id')) { $m = <$fh> }
+ # (g)hostid(1) is in GNU coreutils, kern.hostid is most BSDs
+ chomp($m ||= `{ sysctl -n kern.hostid ||
+ hostid || ghostid; } 2>/dev/null`
+ || "no-machine-id-or-hostid-on-$^O");
+ $m;
+ };
+}
+
+sub eidxq_release {
+ my ($self) = @_;
+ my $expect = delete($self->{-eidxq_locked}) or return;
+ my ($owner_pid, undef) = split(/-/, $expect);
+ return if $owner_pid != $$; # shards may fork
+ my $oidx = $self->{oidx};
+ $oidx->begin_lazy;
+ my $cur = $oidx->eidx_meta('eidxq_lock') // '';
+ if ($cur eq $expect) {
+ $oidx->eidx_meta('eidxq_lock', '');
+ return 1;
+ } elsif ($cur ne '') {
+ warn "E: eidxq_lock($expect) stolen by $cur\n";
+ } else {
+ warn "E: eidxq_lock($expect) released by another process\n";
+ }
+ undef;
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ eidxq_release($self) and $self->{oidx}->commit_lazy;
+}
+
+sub _eidxq_take ($) {
+ my ($self) = @_;
+ my $val = "$$-${\time}-$>-".host_ident;
+ $self->{oidx}->eidx_meta('eidxq_lock', $val);
+ $self->{-eidxq_locked} = $val;
+}
+
+sub eidxq_lock_acquire ($) {
+ my ($self) = @_;
+ my $oidx = $self->{oidx};
+ $oidx->begin_lazy;
+ my $cur = $oidx->eidx_meta('eidxq_lock') || return _eidxq_take($self);
+ if (my $locked = $self->{-eidxq_locked}) { # be lazy
+ return $locked if $locked eq $cur;
+ }
+ my ($pid, $time, $euid, $ident) = split(/-/, $cur, 4);
+ my $t = strftime('%Y-%m-%d %k:%M:%S', gmtime($time));
+ if ($euid == $> && $ident eq host_ident) {
+ if (kill(0, $pid)) {
+ warn <<EOM; return;
+I: PID:$pid (re)indexing Xapian since $t, it will continue our work
+EOM
+ }
+ if ($!{ESRCH}) {
+ warn "I: eidxq_lock is stale ($cur), clobbering\n";
+ return _eidxq_take($self);
+ }
+ warn "E: kill(0, $pid) failed: $!\n"; # fall-through:
+ }
+ my $fn = $oidx->dbh->sqlite_db_filename;
+ warn <<EOF;
+W: PID:$pid, UID:$euid on $ident is indexing Xapian since $t
+W: If this is unexpected, delete `eidxq_lock' from the `eidx_meta' table:
+W: sqlite3 $fn 'DELETE FROM eidx_meta WHERE key = "eidxq_lock"'
+EOF
+ undef;
+}
+
sub eidxq_process ($$) { # for reindexing
my ($self, $sync) = @_;
+ return unless eidxq_lock_acquire($self);
my $dbh = $self->{oidx}->dbh;
my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
${$sync->{nr}} = 0;
my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
$pr->("Xapian indexing $min..$max (total=$tot)\n");
}
- my %id2pos;
- my $pos = 0;
- $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
- $sync->{id2pos} = \%id2pos;
-
- my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
- my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+ $sync->{id2pos} //= do {
+ my %id2pos;
+ my $pos = 0;
+ $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+ \%id2pos;
+ };
+ my ($del, $iter);
+restart:
+ $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+ $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
$iter->execute;
while (defined(my $docid = $iter->fetchrow_array)) {
last if $sync->{quit};
$del->execute($docid);
++${$sync->{nr}};
- # this is only for SIGUSR1, shards do their own accounting:
- reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+ if (checkpoint_due($sync)) {
+ $dbh = $del = $iter = undef;
+ reindex_checkpoint($self, $sync); # release lock
+ $dbh = $self->{oidx}->dbh;
+ goto restart;
+ }
}
$self->git->async_wait_all;
$pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
sub _reindex_check_unseen ($$$) {
my ($self, $sync, $ibx) = @_;
my $ibx_id = $ibx->{-ibx_id};
- my ($beg, $end) = (1, 1000);
+ my $slice = 1000;
+ my ($beg, $end) = (1, $slice);
# first, check if we missed any messages in target $ibx
- my $inx3 = $self->{oidx}->dbh->prepare(<<'');
-SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
-
my $msgs;
+ my $pr = $sync->{-opt}->{-progress};
+ my $ekey = $ibx->eidx_key;
+ $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+ ${$sync->{nr}} = 0;
+
while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+ ${$sync->{nr}} = $beg;
$beg = $msgs->[-1]->{num} + 1;
- $end = $beg + 1000;
+ $end = $beg + $slice;
+ if (checkpoint_due($sync)) {
+ reindex_checkpoint($self, $sync); # release lock
+ }
+
+ my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT DISTINCT(docid) FROM xref3 WHERE
+ibx_id = ? AND xnum = ? AND oidbin = ?
+
for my $xsmsg (@$msgs) {
my $oidbin = pack('H*', $xsmsg->{blob});
$inx3->bind_param(1, $ibx_id);
sub _reindex_check_stale ($$$) {
my ($self, $sync, $ibx) = @_;
-
- # now, check if there's stale xrefs
- my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
-SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
-
- $get_xnum->execute($ibx->{-ibx_id});
- my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+ my $min = 0;
+ my $pr = $sync->{-opt}->{-progress};
+ my $fetching;
+ my $ekey = $ibx->eidx_key;
+ $sync->{-regen_fmt} =
+ "$ekey check stale/missing %u/".$ibx->over->max."\n";
+ ${$sync->{nr}} = 0;
+ do {
+ if (checkpoint_due($sync)) {
+ reindex_checkpoint($self, $sync); # release lock
+ }
+ # now, check if there's stale xrefs
+ my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ?
+ORDER BY docid,xnum ASC LIMIT 10000
+
+ $iter->execute($ibx->{-ibx_id}, $min);
+ $fetching = undef;
+
+ while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) {
+ return if $sync->{quit};
+ ${$sync->{nr}} = $xnum;
+
+ $fetching = $min = $docid;
+ my $smsg = $ibx->over->get_art($xnum);
+ my $oidhex = unpack('H*', $oidbin);
+ my $err;
+ if (!$smsg) {
+ $err = 'stale';
+ } elsif ($smsg->{blob} ne $oidhex) {
+ $err = "mismatch (!= $smsg->{blob})";
+ } else {
+ next; # likely, all good
+ }
+ # current_info already has eidx_key
+ warn "$xnum:$oidhex (#$docid): $err\n";
+ my $del = $self->{oidx}->dbh->prepare_cached(<<'');
DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
- while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
- last if $sync->{quit};
- my $smsg = $ibx->over->get_art($xnum);
- my $oidhex = unpack('H*', $oidbin);
- my $err;
- if (!$smsg) {
- $err = 'stale';
- } elsif ($smsg->{blob} ne $oidhex) {
- $err = "mismatch (!= $smsg->{blob})";
- } else {
- next; # likely, all good
- }
- warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
- $del_xref3->bind_param(1, $ibx->{-ibx_id});
- $del_xref3->bind_param(2, $xnum);
- $del_xref3->bind_param(3, $oidbin, SQL_BLOB);
- $del_xref3->execute;
-
- # get_xref3 over-fetches, but this is a rare path:
- my $xr3 = $self->{oidx}->get_xref3($docid);
- my $idx = $self->idx_shard($docid);
- if (scalar(@$xr3) == 0) { # all gone
- $self->{oidx}->delete_by_num($docid);
- $self->{oidx}->eidxq_del($docid);
- $idx->shard_remove($docid);
- } else { # enqueue for reindex of remaining messages
- $idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
- $self->{oidx}->eidxq_add($docid); # yes, add
+ $del->bind_param(1, $ibx->{-ibx_id});
+ $del->bind_param(2, $xnum);
+ $del->bind_param(3, $oidbin, SQL_BLOB);
+ $del->execute;
+
+ # get_xref3 over-fetches, but this is a rare path:
+ my $xr3 = $self->{oidx}->get_xref3($docid);
+ my $idx = $self->idx_shard($docid);
+ if (scalar(@$xr3) == 0) { # all gone
+ $self->{oidx}->delete_by_num($docid);
+ $self->{oidx}->eidxq_del($docid);
+ $idx->shard_remove($docid);
+ } else { # enqueue for reindex of remaining messages
+ $idx->shard_remove_eidx_info($docid,
+ $ibx->eidx_key);
+ $self->{oidx}->eidxq_add($docid); # yes, add
+ }
}
- }
+ } while (defined $fetching);
}
sub _reindex_inbox ($$$) {
my ($self, $sync, $ibx) = @_;
+ local $self->{current_info} = $ibx->eidx_key;
_reindex_check_unseen($self, $sync, $ibx);
_reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
delete @$ibx{qw(over mm search git)}; # won't need these for a bit
sub eidx_reindex {
my ($self, $sync) = @_;
+ # acquire eidxq_lock early because full reindex takes forever
+ # and incremental -extindex processes can run during our checkpoints
+ if (!eidxq_lock_acquire($self)) {
+ warn "E: aborting --reindex\n";
+ return;
+ }
for my $ibx (@{$self->{ibx_list}}) {
_reindex_inbox($self, $sync, $ibx);
last if $sync->{quit};
$self->{oidx}->rethread_prepare($opt);
my $sync = {
need_checkpoint => \(my $need_checkpoint = 0),
+ check_intvl => 10,
+ next_check => now() + 10,
-opt => $opt,
# DO NOT SET {reindex} here, it's incompatible with reused
# V2Writable code, reindex is totally different here
$self->{oidx}->rethread_done($opt) unless $sync->{quit};
eidxq_process($self, $sync) unless $sync->{quit};
+ eidxq_release($self);
PublicInbox::V2Writable::done($self);
}