done($self);
}
+sub _reindex_finalize ($$$) {
+ my ($req, $smsg, $eml) = @_;
+ my $sync = $req->{sync};
+ my $self = $sync->{self};
+ my $by_chash = $req->{by_chash};
+ my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
+ my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+ my $docid = $smsg->{num} = $orig_smsg->{num};
+ $self->{oidx}->add_overview($eml, $smsg); # may rethread
+ return if $nr == 1; # likely, all good
+
+ warn "W: #$docid split into $nr due to deduplication change\n";
+ my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+ delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
+ for my $ary (values %$by_chash) {
+ for my $x (reverse @$ary) {
+ my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
+ die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
+ }
+ my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+ $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
+ my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
+ my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+ my $ibx = $self->{ibx_list}->[$pos] //
+ die "BUG: ibx for $x->{blob} not mapped";
+ my $e = $ibx->over->get_art($x->{num});
+ $e->{blob} eq $x->{blob} or die <<EOF;
+$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
+EOF
+ reindex_unseen($self, $sync, $ibx, $e);
+ }
+}
+
+sub _reindex_oid { # git->cat_async callback
+ my ($bref, $oid, $type, $size, $req) = @_;
+ my $sync = $req->{sync};
+ my $self = $sync->{self};
+ my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+ my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+ my $docid = $orig_smsg->{num};
+ if (is_bad_blob($oid, $type, $size, $expect_oid)) {
+ my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
+ if ($remain == 0) {
+ warn "W: #$docid gone or corrupted\n";
+ $self->idx_shard($docid)->shard_remove($docid);
+ } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+ $self->git->cat_async($next_oid, \&_reindex_oid, $req);
+ } else {
+ warn "BUG: #$docid gone (UNEXPECTED)\n";
+ $self->idx_shard($docid)->shard_remove($docid);
+ }
+ return;
+ }
+ my $ci = $self->{current_info};
+ local $self->{current_info} = "$ci #$docid $oid";
+ my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
+ $re_smsg->{bytes} = $size + crlf_adjust($$bref);
+ my $eml = PublicInbox::Eml->new($bref);
+ $re_smsg->populate($eml, { autime => $orig_smsg->{ds},
+ cotime => $orig_smsg->{ts} });
+ my $chash = content_hash($eml);
+ $re_smsg->{chash} = $chash;
+ $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
+ $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+ push @{$req->{by_chash}->{$chash}}, $re_smsg;
+ if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+ $self->git->cat_async($next_oid, \&_reindex_oid, $req);
+ } else { # last $re_smsg is the highest priority xref3
+ local $self->{current_info} = "$ci #$docid";
+ _reindex_finalize($req, $re_smsg, $eml);
+ }
+}
+
+sub _reindex_smsg ($$$) {
+ my ($self, $sync, $smsg) = @_;
+ my $docid = $smsg->{num};
+ my $xr3 = $self->{oidx}->get_xref3($docid, 1);
+ if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this
+ warn <<"";
+BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
+
+ $self->{oidx}->delete_by_num($docid);
+ $self->idx_shard($docid)->shard_remove($docid);
+ return;
+ }
+
+ # we sort {xr3} in the reverse order of {ibx_list} so we can
+ # hit the common case in _reindex_finalize without rereading
+ # from git (or holding multiple messages in memory).
+ my $id2pos = $sync->{id2pos}; # index in {ibx_list}
+ @$xr3 = sort {
+ $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
+ ||
+ $b->[1] <=> $a->[1] # break ties with {xnum}
+ } @$xr3;
+ @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
+ my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+ $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
+}
+
sub eidxq_process ($$) { # for reindexing
my ($self, $sync) = @_;
my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
$pr->("Xapian indexing $min..$max (total=$tot)\n");
}
+ my %id2pos;
+ my $pos = 0;
+ $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+ $sync->{id2pos} = \%id2pos;
my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
$iter->execute;
while (defined(my $docid = $iter->fetchrow_array)) {
- $self->idx_shard($docid)->shard_reindex_docid($docid);
- $del->execute($docid);
last if $sync->{quit};
+ if (my $smsg = $self->{oidx}->get_art($docid)) {
+ _reindex_smsg($self, $sync, $smsg);
+ } else {
+ warn "E: #$docid does not exist in over\n";
+ }
+ $del->execute($docid);
+
my $cur = ++${$sync->{nr}};
# shards flush on their own, just don't queue up too many
# deletes
if (($cur % 1000) == 0) {
+ $self->git->async_wait_all;
$self->{oidx}->commit_lazy;
$self->{oidx}->begin_lazy;
$pr->("reindexed $cur/$tot\n") if $pr;
# this is only for SIGUSR1, shards do their own accounting:
reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
}
+ $self->git->async_wait_all;
$pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
- $self->{oidx}->commit_lazy;
- $self->{oidx}->begin_lazy;
}
sub _reindex_unseen { # git->cat_async callback
}
sub get_xref3 {
- my ($self, $num) = @_;
+ my ($self, $num, $raw) = @_;
my $dbh = dbh($self);
my $sth = $dbh->prepare_cached(<<'', undef, 1);
SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id,xnum ASC
$sth->execute($num);
my $rows = $sth->fetchall_arrayref;
+ return $rows if $raw;
my $eidx_key_sth = $dbh->prepare_cached(<<'', undef, 1);
SELECT eidx_key FROM inboxes WHERE ibx_id = ?
use PublicInbox::TestCommon;
use PublicInbox::Config;
use PublicInbox::Search;
+use PublicInbox::InboxWritable;
use Fcntl qw(:seek);
my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
require_git(2.6);
require_mods(qw(DBD::SQLite Search::Xapian));
use_ok 'PublicInbox::ExtSearch';
use_ok 'PublicInbox::ExtSearchIdx';
+use_ok 'PublicInbox::OverIdx';
my $sock = tcp_server();
my $host_port = $sock->sockhost . ':' . $sock->sockport;
my ($home, $for_destroy) = tmpdir();
my $cfg = PublicInbox::Config->new;
my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
+my $f = "$home/extindex/ei$schema_version/over.sqlite3";
+my $oidx = PublicInbox::OverIdx->new($f);
if ('inject w/o indexing') {
use PublicInbox::Import;
my $v1ibx = $cfg->lookup_name('v1test');
}
if ('reindex catches missed messages') {
- use PublicInbox::InboxWritable;
- use PublicInbox::OverIdx;
my $v2ibx = $cfg->lookup_name('v2test');
my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
$im->done;
my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
isnt($cmt_a, $cmt_b, 'v2 0.git HEAD updated');
- my $f = "$home/extindex/ei$schema_version/over.sqlite3";
- my $oidx = PublicInbox::OverIdx->new($f);
$oidx->dbh;
my $uv = $v2ibx->uidvalidity;
my $lc_key = "lc-v2:v2.example//$uv;0";
is($oidx->max, $max + 1, '->max bumped');
is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 stays unchanged');
my @err = split(/^/, $err);
- is(scalar(@err), 1, 'only one warning');
+ is(scalar(@err), 1, 'only one warning') or diag "err=$err";
like($err[0], qr/I: reindex_unseen/, 'got reindex_unseen message');
my $new = $oidx->get_art($max + 1);
is($new->{subject}, $eml->header('Subject'), 'new message added');
$v2ibx->{inboxdir}], undef, $opt),
'--reindex for stale');
@err = split(/^/, $err);
- is(scalar(@err), 1, 'only one warning');
+ is(scalar(@err), 1, 'only one warning') or diag "err=$err";
like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
is($oidx->get_art($new->{num}), undef,
'stale message gone from over');
is($mset->size, 0, 'stale mid gone Xapian');
}
+if ('reindex catches content bifurcation') {
+ use PublicInbox::MID qw(mids);
+ my $v2ibx = $cfg->lookup_name('v2test');
+ my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
+ my $eml = eml_load('t/data/message_embed.eml');
+ my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
+ $im->add($eml);
+ $im->done;
+ my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
+ my $uv = $v2ibx->uidvalidity;
+ my $lc_key = "lc-v2:v2.example//$uv;0";
+ $oidx->dbh;
+ is($oidx->eidx_meta($lc_key, $cmt_b), $cmt_a,
+ 'update lc-v2 meta, old is as expected');
+ my $mid = mids($eml)->[0];
+ my $smsg = $v2ibx->over->next_by_mid($mid, \(my $id), \(my $prev));
+ my $oldmax = $oidx->max;
+ my $x3_orig = $oidx->get_xref3(3);
+ is(scalar(@$x3_orig), 1, '#3 has one xref');
+ $oidx->add_xref3(3, $smsg->{num}, $smsg->{blob}, 'v2.example');
+ my $x3 = $oidx->get_xref3(3);
+ is(scalar(@$x3), 2, 'injected xref3');
+ $oidx->commit_lazy;
+ my $opt = { 2 => \(my $err = '') };
+ ok(run_script([qw(-extindex --all), "$home/extindex"], undef, $opt),
+ 'extindex --all is noop');
+ is($err, '', 'no warnings in index');
+ $oidx->dbh;
+ is($oidx->max, $oldmax, 'oidx->max unchanged');
+ $oidx->dbh_close;
+ ok(run_script([qw(-extindex --reindex --all), "$home/extindex"],
+ undef, $opt), 'extindex --reindex');
+ $oidx->dbh;
+ ok($oidx->max > $oldmax, 'oidx->max bumped');
+ like($err, qr/split into 2 due to deduplication change/,
+ 'bifurcation noted');
+ my $added = $oidx->get_art($oidx->max);
+ is($added->{blob}, $smsg->{blob}, 'new blob indexed');
+ is_deeply(["v2.example:$smsg->{num}:$smsg->{blob}"],
+ $oidx->get_xref3($added->{num}),
+ 'xref3 corrected for bifurcated message');
+ is_deeply($oidx->get_xref3(3), $x3_orig, 'xref3 restored for #3');
+}
+
+if ('--reindex --rethread') {
+ my $before = $oidx->dbh->selectrow_array(<<'');
+SELECT MAX(tid) FROM over WHERE num > 0
+
+ my $opt = {};
+ ok(run_script([qw(-extindex --reindex --rethread --all),
+ "$home/extindex"], undef, $opt),
+ '--rethread');
+ my $after = $oidx->dbh->selectrow_array(<<'');
+SELECT MIN(tid) FROM over WHERE num > 0
+
+ # actual rethread logic is identical to v1/v2 and tested elsewhere
+ ok($after > $before, '--rethread updates MIN(tid)');
+}
+
if ('remove v1test and test gc') {
xsys([qw(git config --unset publicinbox.v1test.inboxdir)],
{ GIT_CONFIG => $cfg_path });