use PublicInbox::ContentHash qw(content_hash);
use PublicInbox::Eml;
use File::Spec;
+use DBI qw(:sql_types); # SQL_BLOB
sub new {
my (undef, $dir, $opt) = @_;
my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key,
\$rm_eidx_info);
if ($nr == 0) {
+ $self->{oidx}->eidxq_del($docid);
$idx->shard_remove($docid);
} elsif ($rm_eidx_info) {
$idx->shard_remove_eidx_info($docid, $eidx_key, $eml);
+ $self->{oidx}->eidxq_add($docid); # yes, add
}
}
}
sub do_step ($) { # main iterator for adding messages to the index
my ($req) = @_;
- my $self = $req->{self};
+ my $self = $req->{self} // die 'BUG: {self} missing';
while (1) {
if (my $next_arg = $req->{next_arg}) {
if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) {
$ibx->git->cleanup; # done with this inbox, now
}
-sub unref_doc ($$$$) {
+sub gc_unref_doc ($$$$) {
my ($self, $ibx_id, $eidx_key, $docid) = @_;
my $dbh = $self->{oidx}->dbh;
DELETE FROM xref3 WHERE docid = ? AND ibx_id = ?
my $remain = $self->{oidx}->get_xref3($docid);
- my $idx = $self->idx_shard($docid);
- if (@$remain) {
+ if (scalar(@$remain)) {
+ $self->{oidx}->eidxq_add($docid); # enqueue for reindex
for my $oid (@oid) {
warn "I: unref #$docid $eidx_key $oid\n";
- $idx->shard_remove_eidx_info($docid, $eidx_key);
}
} else {
warn "I: remove #$docid $eidx_key @oid\n";
- $idx->shard_remove($docid);
+ $self->idx_shard($docid)->shard_remove($docid);
}
}
warn "I: deleting messages for $eidx_key...\n";
$x3_doc->execute($ibx_id);
while (defined(my $docid = $x3_doc->fetchrow_array)) {
- unref_doc($self, $ibx_id, $eidx_key, $docid);
+ gc_unref_doc($self, $ibx_id, $eidx_key, $docid);
}
$dbh->prepare_cached(<<'')->execute($ibx_id);
DELETE FROM inboxes WHERE ibx_id = ?
done($self);
}
+sub eidxq_process ($$) { # for reindexing
+ my ($self, $sync) = @_;
+
+ $self->{oidx}->commit_lazy; # ensure shard workers can see it
+ $self->{oidx}->begin_lazy;
+ my $dbh = $self->{oidx}->dbh;
+ my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
+ ${$sync->{nr}} = 0;
+ $sync->{-regen_fmt} = "%u/$tot\n";
+ my $pr = $sync->{-opt}->{-progress};
+ if ($pr) {
+ my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
+ my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
+ $pr->("Xapian indexing $min..$max (total=$tot)\n");
+ }
+
+ my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+ my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+ $iter->execute;
+ while (defined(my $docid = $iter->fetchrow_array)) {
+ $self->idx_shard($docid)->shard_reindex_docid($docid);
+ $del->execute($docid);
+ last if $sync->{quit};
+ my $cur = ++${$sync->{nr}};
+
+ # shards flush on their own, just don't queue up too many
+ # deletes
+ if (($cur % 1000) == 0) {
+ $self->{oidx}->commit_lazy;
+ $self->{oidx}->begin_lazy;
+ $pr->("reindexed $cur/$tot\n") if $pr;
+ }
+ # this is only for SIGUSR1, shards do their own accounting:
+ reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+ }
+ $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
+ $self->{oidx}->commit_lazy;
+ $self->{oidx}->begin_lazy;
+}
+
+sub _reindex_unseen { # git->cat_async callback
+ my ($bref, $oid, $type, $size, $req) = @_;
+ return if is_bad_blob($oid, $type, $size, $req->{oid});
+ my $self = $req->{self} // die 'BUG: {self} unset';
+ local $self->{current_info} = "$self->{current_info} $oid";
+ my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
+ $new_smsg->{bytes} = $size + crlf_adjust($$bref);
+ my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
+ $req->{new_smsg} = $new_smsg;
+ $req->{chash} = content_hash($eml);
+ $req->{mids} = mids($eml); # do_step iterates through this
+ do_step($req); # enter the normal indexing flow
+}
+
+# --reindex may catch totally unseen messages, this handles them
+sub reindex_unseen ($$$$) {
+ my ($self, $sync, $ibx, $xsmsg) = @_;
+ my $req = {
+ %$sync, # has {self}
+ autime => $xsmsg->{ds},
+ cotime => $xsmsg->{ts},
+ oid => $xsmsg->{blob},
+ ibx => $ibx,
+ xnum => $xsmsg->{num},
+ # {mids} and {chash} will be filled in at _reindex_unseen
+ };
+ warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n";
+ $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req);
+}
+
+sub _reindex_check_unseen ($$$) {
+ my ($self, $sync, $ibx) = @_;
+ my $ibx_id = $ibx->{-ibx_id};
+ my ($beg, $end) = (1, 1000);
+
+ # first, check if we missed any messages in target $ibx
+ my $inx3 = $self->{oidx}->dbh->prepare(<<'');
+SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+ my $msgs;
+ while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+ $beg = $msgs->[-1]->{num} + 1;
+ $end = $beg + 1000;
+ for my $xsmsg (@$msgs) {
+ my $oidbin = pack('H*', $xsmsg->{blob});
+ $inx3->bind_param(1, $ibx_id);
+ $inx3->bind_param(2, $xsmsg->{num});
+ $inx3->bind_param(3, $oidbin, SQL_BLOB);
+ $inx3->execute;
+ my $docids = $inx3->fetchall_arrayref;
+ # index messages which were totally missed
+ # the first time around ASAP:
+ if (scalar(@$docids) == 0) {
+ reindex_unseen($self, $sync, $ibx, $xsmsg);
+ } else { # already seen, reindex later
+ for my $r (@$docids) {
+ $self->{oidx}->eidxq_add($r->[0]);
+ }
+ }
+ last if $sync->{quit};
+ }
+ last if $sync->{quit};
+ }
+}
+
+sub _reindex_check_stale ($$$) {
+ my ($self, $sync, $ibx) = @_;
+
+ # now, check if there's stale xrefs
+ my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
+
+ $get_xnum->execute($ibx->{-ibx_id});
+ my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+ while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
+ last if $sync->{quit};
+ my $smsg = $ibx->over->get_art($xnum);
+ my $oidhex = unpack('H*', $oidbin);
+ my $err;
+ if (!$smsg) {
+ $err = 'stale';
+ } elsif ($smsg->{blob} ne $oidhex) {
+ $err = "mismatch (!= $smsg->{blob})";
+ } else {
+ next; # likely, all good
+ }
+ warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
+ $del_xref3->bind_param(1, $ibx->{-ibx_id});
+ $del_xref3->bind_param(2, $xnum);
+ $del_xref3->bind_param(3, $oidbin, SQL_BLOB);
+ $del_xref3->execute;
+
+ # get_xref3 over-fetches, but this is a rare path:
+ my $xr3 = $self->{oidx}->get_xref3($docid);
+ my $idx = $self->idx_shard($docid);
+ if (scalar(@$xr3) == 0) { # all gone
+ $self->{oidx}->eidxq_del($docid);
+ $idx->shard_remove($docid);
+ } else { # enqueue for reindex of remaining messages
+ $idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
+ $self->{oidx}->eidxq_add($docid); # yes, add
+ }
+ }
+}
+
+sub _reindex_inbox ($$$) {
+ my ($self, $sync, $ibx) = @_;
+ _reindex_check_unseen($self, $sync, $ibx);
+ _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+ delete @$ibx{qw(over mm search git)}; # won't need these for a bit
+}
+
+sub eidx_reindex {
+ my ($self, $sync) = @_;
+
+ for my $ibx (@{$self->{ibx_list}}) {
+ _reindex_inbox($self, $sync, $ibx);
+ last if $sync->{quit};
+ }
+ $self->git->async_wait_all; # ensure eidxq gets filled completely
+ eidxq_process($self, $sync) unless $sync->{quit};
+}
+
sub eidx_sync { # main entry point
my ($self, $opt) = @_;
- $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
- $self->{oidx}->rethread_prepare($opt);
my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
local $self->{current_info} = '';
local $SIG{__WARN__} = sub {
$warn_cb->($self->{current_info}, ': ', @_);
};
+ $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
+ $self->{oidx}->rethread_prepare($opt);
my $sync = {
need_checkpoint => \(my $need_checkpoint = 0),
- reindex => $opt->{reindex},
-opt => $opt,
+ # DO NOT SET {reindex} here, it's incompatible with reused
+ # V2Writable code, reindex is totally different here
+ # compared to v1/v2 inboxes because we have multiple histories
self => $self,
-regen_fmt => "%u/?\n",
};
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
+ for my $ibx (@{$self->{ibx_list}}) {
+ $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
+ }
+ eidx_reindex($self, $sync) if delete($opt->{reindex});
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
_sync_inbox($self, $sync, $ibx);
}
$self->{oidx}->rethread_done($opt) unless $sync->{quit};
+ eidxq_process($self, $sync) unless $sync->{quit};
PublicInbox::V2Writable::done($self);
}
*count_shards = \&PublicInbox::V2Writable::count_shards;
*atfork_child = \&PublicInbox::V2Writable::atfork_child;
*idx_shard = \&PublicInbox::V2Writable::idx_shard;
+*reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
1;
index_list_id($self, $doc, $hdr);
}
-sub add_xapian ($$$$) {
+sub eml2doc ($$$;$) {
my ($self, $eml, $smsg, $mids) = @_;
+ $mids //= mids_for_index($eml);
my $doc = $X->{Document}->new;
add_val($doc, PublicInbox::Search::TS(), $smsg->{ts});
my @ds = gmtime($smsg->{ds});
}
}
}
+ $doc;
+}
+
+sub add_xapian ($$$$) {
+ my ($self, $eml, $smsg, $mids) = @_;
+ my $doc = eml2doc($self, $eml, $smsg, $mids);
$self->{xdb}->replace_document($smsg->{num}, $doc);
}
sub _commit_txn {
my ($self) = @_;
+ if (my $eidx = $self->{eidx}) {
+ $eidx->git->async_wait_all;
+ $eidx->{transact_bytes} = 0;
+ }
if (my $xdb = $self->{xdb}) {
set_metadata_once($self);
$xdb->commit_transaction;
}
}
+sub reindex_xap { # git->cat_async callback
+ my ($bref, $oid, $type, $size, $ary) = @_;
+ my ($ibx_id, $oidhex, $req, $more) = @$ary;
+ my $self = $req->{self} // die 'BUG: {self} missing';
+ my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+ my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} //
+ die "BUG: bad ibx_id=$ibx_id ($oid)";
+
+ my $docid = $req->{docid};
+ local $eidx->{current_info} = "#$docid $oid";
+ return if is_bad_blob($oid, $type, $size, $oidhex);
+ if (my $doc = $req->{doc}) { # modify existing doc
+ $req->{tg_isset} //= do { # for existing documents in {xdb}
+ term_generator($self)->set_document($doc);
+ 1;
+ };
+ $doc->add_boolean_term('O'.$eidx_key);
+ index_list_id($self, $doc, PublicInbox::Eml->new($bref));
+ } else { # first time seeing this doc
+ my $smsg = $self->{eidx}->over->get_art($docid) //
+ die "BUG: #$docid ($oid) not in over";
+ $smsg->{bytes} = $size + crlf_adjust($$bref);
+ $smsg->{eidx_key} = $eidx_key;
+ my $eml = PublicInbox::Eml->new($bref);
+ $req->{doc} = eml2doc($self, $eml, $smsg);
+ $req->{tg_isset} = 1; # eml2doc calls $tg->set_document
+ }
+ return if $more;
+ my $doc = delete($req->{doc}) or return; # all bad blobs!
+ $eidx->{transact_bytes} += $size;
+ $self->{xdb}->replace_document($req->{docid}, $doc);
+}
+
+sub reindex_docid {
+ my ($self, $docid) = @_;
+ my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+ my $eidx_key_for = $self->{-eidx_key_for} //= do {
+ my %eidx_key_for = map {
+ $_->[0] => $_->[1];
+ } @{$eidx->over->dbh->selectall_arrayref(<<'')};
+SELECT ibx_id,eidx_key FROM inboxes
+
+ \%eidx_key_for;
+ };
+
+ begin_txn_lazy($self);
+ my $doc = eval { $self->{xdb}->get_document($docid) };
+ my $req = { doc => $doc, self => $self, docid => $docid };
+ my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1);
+SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC
+
+ $sth->execute($docid);
+ my $rows = $sth->fetchall_arrayref;
+ while (my $row = shift(@$rows)) {
+ my ($ibx_id, $oidbin) = @$row;
+ my $oidhex = unpack('H*', $oidbin);
+ $eidx->git->cat_async($oidhex, \&reindex_xap,
+ [ $ibx_id, $oidhex, $req, scalar(@$rows) ]);
+ }
+ if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) {
+ commit_txn_lazy($self);
+ }
+}
+
1;
use Test::More;
use PublicInbox::TestCommon;
use PublicInbox::Config;
+use PublicInbox::Search;
use Fcntl qw(:seek);
my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
require_git(2.6);
is(scalar(@$xref3), 2, 'only to entries');
}
-{
+if ('inbox edited') {
my ($in, $out, $err);
$in = $out = $err = '';
my $opt = { 0 => \$in, 1 => \$out, 2 => \$err };
like($it[0]->get_document->get_data, qr/v2test/, 'docdata matched v2');
like($it[1]->get_document->get_data, qr/v1test/, 'docdata matched v1');
+my $cfg = PublicInbox::Config->new;
+my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
if ('inject w/o indexing') {
use PublicInbox::Import;
- use PublicInbox::Search;
- my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
- my $v1ibx = PublicInbox::Config->new->lookup_name('v1test');
+ my $v1ibx = $cfg->lookup_name('v1test');
my $last_v1_commit = $v1ibx->mm->last_commit;
- my $v2ibx = PublicInbox::Config->new->lookup_name('v2test');
+ my $v2ibx = $cfg->lookup_name('v2test');
my $last_v2_commit = $v2ibx->mm->last_commit_xap($schema_version, 0);
my $git0 = PublicInbox::Git->new("$v2ibx->{inboxdir}/git/0.git");
chomp(my $cmt = $git0->qx(qw(rev-parse HEAD^0)));
is($mset->size, 1, 'got v2 message');
}
+if ('reindex catches missed messages') {
+ use PublicInbox::InboxWritable;
+ use PublicInbox::OverIdx;
+ my $v2ibx = $cfg->lookup_name('v2test');
+ my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
+ my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
+ my $eml = eml_load('t/data/0001.patch');
+ $im->add($eml);
+ $im->done;
+ my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
+ isnt($cmt_a, $cmt_b, 'v2 0.git HEAD updated');
+ my $f = "$home/extindex/ei$schema_version/over.sqlite3";
+ my $oidx = PublicInbox::OverIdx->new($f);
+ $oidx->dbh;
+ my $uv = $v2ibx->uidvalidity;
+ my $lc_key = "lc-v2:v2.example//$uv;0";
+ is($oidx->eidx_meta($lc_key, $cmt_b), $cmt_a,
+ 'update lc-v2 meta, old is as expected');
+ my $max = $oidx->max;
+ $oidx->dbh_close;
+ ok(run_script([qw(-extindex), "$home/extindex", $v2ibx->{inboxdir}]),
+ '-extindex noop');
+ is($oidx->max, $max, '->max unchanged');
+ is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 unchanged');
+ $oidx->dbh_close;
+ my $opt = { 2 => \(my $err = '') };
+ ok(run_script([qw(-extindex --reindex), "$home/extindex",
+ $v2ibx->{inboxdir}], undef, $opt),
+ '--reindex for unseen');
+ is($oidx->max, $max + 1, '->max bumped');
+ is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 stays unchanged');
+ my @err = split(/^/, $err);
+ is(scalar(@err), 1, 'only one warning');
+ like($err[0], qr/I: reindex_unseen/, 'got reindex_unseen message');
+ my $new = $oidx->get_art($max + 1);
+ is($new->{subject}, $eml->header('Subject'), 'new message added');
+
+ ok($im->remove($eml), 'remove new message from v2 inbox');
+ $im->done;
+ my $cmt_c = $v2ibx->mm->last_commit_xap($schema_version, 0);
+ is($oidx->eidx_meta($lc_key, $cmt_c), $cmt_b,
+ 'bump lc-v2 meta again to skip v2 remove');
+ $err = '';
+ ok(run_script([qw(-extindex --reindex), "$home/extindex",
+ $v2ibx->{inboxdir}], undef, $opt),
+ '--reindex for stale');
+ @err = split(/^/, $err);
+ is(scalar(@err), 1, 'only one warning');
+ like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
+}
+
if ('remove v1test and test gc') {
xsys([qw(git config --unset publicinbox.v1test.inboxdir)],
{ GIT_CONFIG => $cfg_path });