# Copyright (C) 2015-2020 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# based on notmuch, but with no concept of folders, files or flags
+# based on notmuch, but with no concept of folders, files
#
# Indexes mail with Xapian and our (SQLite-based) ::Msgmap for use
# with the web and NNTP interfaces. This index maintains thread
use PublicInbox::MID qw(mids_for_index mids);
use PublicInbox::MsgIter;
use PublicInbox::IdxStack;
-use Carp qw(croak);
+use Carp qw(croak carp);
use POSIX qw(strftime);
use Time::Local qw(timegm);
use PublicInbox::OverIdx;
use PublicInbox::Spawn qw(spawn nodatacow_dir);
use PublicInbox::Git qw(git_unquote);
use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
-our @EXPORT_OK = qw(crlf_adjust log2stack is_ancestor check_size);
+our @EXPORT_OK = qw(crlf_adjust log2stack is_ancestor check_size prepare_stack
+ index_text term_generator add_val is_bad_blob);
my $X = \%PublicInbox::Search::X;
-my ($DB_CREATE_OR_OPEN, $DB_OPEN);
+our ($DB_CREATE_OR_OPEN, $DB_OPEN);
our $DB_NO_SYNC = 0;
our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff : 1_000_000;
use constant DEBUG => !!$ENV{DEBUG};
}
}
$ibx = PublicInbox::InboxWritable->new($ibx);
- my $self = bless {
- ibx => $ibx,
- xpfx => $inboxdir, # for xpfx_init
- -altid => $altid,
- ibx_ver => $version,
- indexlevel => $indexlevel,
- }, $class;
- $self->xpfx_init;
+ my $self = PublicInbox::Search->new($ibx);
+ bless $self, $class;
+ $self->{ibx} = $ibx;
+ $self->{-altid} = $altid;
+ $self->{indexlevel} = $indexlevel;
$self->{-set_indexlevel_once} = 1 if $indexlevel eq 'medium';
if ($ibx->{-skip_docdata}) {
$self->{-set_skip_docdata_once} = 1;
}
}
return unless defined $flag;
- $flag |= $DB_NO_SYNC if $self->{ibx}->{-no_fsync};
+ $flag |= $DB_NO_SYNC if ($self->{ibx} // $self->{eidx})->{-no_fsync};
my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) };
croak "Failed opening $dir: $@" if $@;
$self->{xdb} = $xdb;
$self->{term_generator} //= do {
my $tg = $X->{TermGenerator}->new;
- $tg->set_stemmer($self->stemmer);
+ $tg->set_stemmer(PublicInbox::Search::stemmer($self));
$tg;
}
}
index_list_id($self, $doc, $hdr);
}
-sub add_xapian ($$$$) {
+sub eml2doc ($$$;$) {
my ($self, $eml, $smsg, $mids) = @_;
+ $mids //= mids_for_index($eml);
my $doc = $X->{Document}->new;
add_val($doc, PublicInbox::Search::TS(), $smsg->{ts});
my @ds = gmtime($smsg->{ds});
index_headers($self, $smsg);
if (defined(my $eidx_key = $smsg->{eidx_key})) {
- $doc->add_boolean_term('O'.$eidx_key);
- $doc->add_boolean_term('P'.
- "$eidx_key:$smsg->{num}:$smsg->{blob}");
+ $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
}
msg_iter($eml, \&index_xapian, [ $self, $doc ]);
index_ids($self, $doc, $eml, $mids);
}
}
}
+ $doc;
+}
+
+sub add_xapian ($$$$) {
+ my ($self, $eml, $smsg, $mids) = @_;
+ my $doc = eml2doc($self, $eml, $smsg, $mids);
$self->{xdb}->replace_document($smsg->{num}, $doc);
}
sub _msgmap_init ($) {
my ($self) = @_;
- die "BUG: _msgmap_init is only for v1\n" if $self->{ibx_ver} != 1;
+ die "BUG: _msgmap_init is only for v1\n" if $self->{ibx}->version != 1;
$self->{mm} //= eval {
require PublicInbox::Msgmap;
my $rw = $self->{ibx}->{-no_fsync} ? 2 : 1;
$smsg->{num};
}
-sub _get_doc ($$$) {
- my ($self, $docid, $oid) = @_;
+sub _get_doc ($$) {
+ my ($self, $docid) = @_;
my $doc = eval { $self->{xdb}->get_document($docid) };
$doc // do {
warn "E: $@\n" if $@;
- warn "E: #$docid $oid missing in Xapian\n";
+ warn "E: #$docid missing in Xapian\n";
undef;
}
}
-sub add_xref3 {
- my ($self, $docid, $xnum, $oid, $eidx_key, $eml) = @_;
+sub add_eidx_info {
+ my ($self, $docid, $eidx_key, $eml) = @_;
begin_txn_lazy($self);
- my $doc = _get_doc($self, $docid, $oid) or return;
+ my $doc = _get_doc($self, $docid) or return;
term_generator($self)->set_document($doc);
- $doc->add_boolean_term('O'.$eidx_key);
- $doc->add_boolean_term('P'."$eidx_key:$xnum:$oid");
+ $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
index_list_id($self, $doc, $eml);
$self->{xdb}->replace_document($docid, $doc);
}
-sub remove_xref3 {
- my ($self, $docid, $oid, $eidx_key, $eml) = @_;
+sub remove_eidx_info {
+ my ($self, $docid, $eidx_key, $eml) = @_;
begin_txn_lazy($self);
- my $doc = _get_doc($self, $docid, $oid) or return;
- my $xref3 = PublicInbox::Smsg::xref3(undef, $doc);
- my %x3 = map { $_ => undef } @$xref3;
- for (grep(/\A\Q$eidx_key\E:[0-9]+:\Q$oid\E\z/, @$xref3)) {
- delete $x3{$_};
- $doc->remove_term('P' . $_);
- }
- if (scalar(keys(%x3)) == 0) {
- $self->{xdb}->delete_document($docid);
- if (my $del_fh = $self->{del_fh}) { # TODO
- print $del_fh $docid, "\n" or die "E: print $!";
- }
- } else {
- if (!grep(/\A\Q$eidx_key\E:/, keys %x3)) {
- $doc->remove_term('O'.$eidx_key);
- }
- for my $l ($eml->header_raw('List-Id')) {
- $l =~ /<([^>]+)>/ or next;
- my $lid = lc $1;
- $doc->remove_term('G' . $lid);
-
- # nb: we don't remove the XL probabilistic terms
- # since terms may overlap if cross-posted.
- #
- # IOW, a message which has both <foo.example.com>
- # and <bar.example.com> would have overlapping
- # "XLexample" and "XLcom" as terms and which we
- # wouldn't know if they're safe to remove if we just
- # unindex <foo.example.com> while preserving
- # <bar.example.com>.
- #
- # In any case, this entire sub is will likely never
- # be needed and users using the "l:" prefix are probably
- # rarer.
- }
- $self->{xdb}->replace_document($docid, $doc);
+ my $doc = _get_doc($self, $docid) or return;
+ eval { $doc->remove_term('O'.$eidx_key) };
+ warn "W: ->remove_term O$eidx_key: $@\n" if $@;
+ for my $l ($eml ? $eml->header_raw('List-Id') : ()) {
+ $l =~ /<([^>]+)>/ or next;
+ my $lid = lc $1;
+ eval { $doc->remove_term('G' . $lid) };
+ warn "W: ->remove_term G$lid: $@\n" if $@;
+
+ # nb: we don't remove the XL probabilistic terms
+ # since terms may overlap if cross-posted.
+ #
+ # IOW, a message which has both <foo.example.com>
+ # and <bar.example.com> would have overlapping
+ # "XLexample" and "XLcom" as terms and which we
+ # wouldn't know if they're safe to remove if we just
+ # unindex <foo.example.com> while preserving
+ # <bar.example.com>.
+ #
+ # In any case, this entire sub is will likely never
+ # be needed and users using the "l:" prefix are probably
+ # rarer.
+ }
+ $self->{xdb}->replace_document($docid, $doc);
+}
+
+sub set_keywords {
+ my ($self, $docid, @kw) = @_;
+ begin_txn_lazy($self);
+ my $doc = _get_doc($self, $docid) or return;
+ my %keep = map { $_ => 1 } @kw;
+ my %add = %keep;
+ my @rm;
+ my $end = $doc->termlist_end;
+ for (my $cur = $doc->termlist_begin; $cur != $end; $cur++) {
+ $cur->skip_to('K');
+ last if $cur == $end;
+ my $kw = $cur->get_termname;
+ $kw =~ s/\AK//s or next;
+ $keep{$kw} ? delete($add{$kw}) : push(@rm, $kw);
}
+ return unless (scalar(@rm) + scalar(keys %add));
+ $doc->remove_term('K'.$_) for @rm;
+ $doc->add_boolean_term('K'.$_) for (keys %add);
+ $self->{xdb}->replace_document($docid, $doc);
+}
+
+sub add_keywords {
+ my ($self, $docid, @kw) = @_;
+ begin_txn_lazy($self);
+ my $doc = _get_doc($self, $docid) or return;
+ $doc->add_boolean_term('K'.$_) for @kw;
+ $self->{xdb}->replace_document($docid, $doc);
+}
+
+sub remove_keywords {
+ my ($self, $docid, @kw) = @_;
+ begin_txn_lazy($self);
+ my $doc = _get_doc($self, $docid) or return;
+ my $replace;
+ eval {
+ $doc->remove_term('K'.$_);
+ $replace = 1
+ } for @kw;
+ $self->{xdb}->replace_document($docid, $doc) if $replace;
}
sub get_val ($$) {
}
sub xdb_remove {
- my ($self, $oid, @removed) = @_;
+ my ($self, @docids) = @_;
my $xdb = $self->{xdb} or return;
- for my $num (@removed) {
- my $doc = _get_doc($self, $num, $oid) or next;
- my $smsg = smsg_from_doc($doc);
- my $blob = $smsg->{blob}; # may be undef if --skip-docdata
- if (!defined($blob) || $blob eq $oid) {
- $xdb->delete_document($num);
- } else {
- warn "E: #$num $oid != $blob in Xapian\n";
- }
+ for my $docid (@docids) {
+ eval { $xdb->delete_document($docid) };
+ warn "E: #$docid not in in Xapian? $@\n" if $@;
}
}
-sub remove_by_oid {
- my ($self, $oid, $num) = @_;
- die "BUG: remove_by_oid is v2-only\n" if $self->{oidx};
+sub remove_by_docid {
+ my ($self, $num) = @_;
+ die "BUG: remove_by_docid is v2-only\n" if $self->{oidx};
$self->begin_txn_lazy;
- xdb_remove($self, $oid, $num) if need_xapian($self);
+ xdb_remove($self, $num) if need_xapian($self);
}
sub index_git_blob_id {
$tmp{$_}++ for @removed;
}
if (!$nr) {
- $mids = join('> <', @$mids);
- warn "W: <$mids> missing for removal from overview\n";
+ my $m = join('> <', @$mids);
+ warn "W: <$m> missing for removal from overview\n";
}
while (my ($num, $nr) = each %tmp) {
warn "BUG: $num appears >1 times ($nr) for $oid\n" if $nr != 1;
} else { # just in case msgmap and over.sqlite3 become desynched:
$self->{mm}->mid_delete($mids->[0]);
}
- xdb_remove($self, $oid, keys %tmp) if need_xapian($self);
+ xdb_remove($self, keys %tmp) if need_xapian($self);
}
sub index_mm {
}
}
+sub is_bad_blob ($$$$) {
+ my ($oid, $type, $size, $expect_oid) = @_;
+ if ($type ne 'blob') {
+ carp "W: $expect_oid is not a blob (type=$type)";
+ return 1;
+ }
+ croak "BUG: $oid != $expect_oid" if $oid ne $expect_oid;
+ $size == 0 ? 1 : 0; # size == 0 means purged
+}
+
sub index_both { # git->cat_async callback
my ($bref, $oid, $type, $size, $sync) = @_;
+ return if is_bad_blob($oid, $type, $size, $sync->{oid});
my ($nr, $max) = @$sync{qw(nr max)};
++$$nr;
$$max -= $size;
$size += crlf_adjust($$bref);
my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg';
my $self = $sync->{sidx};
+ local $self->{current_info} = "$self->{current_info}: $oid";
my $eml = PublicInbox::Eml->new($bref);
$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
die "E: could not generate NNTP article number for $oid";
add_message($self, $eml, $smsg, $sync);
+ my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+ ${$sync->{latest_cmt}} = $cur_cmt;
}
sub unindex_both { # git->cat_async callback
- my ($bref, $oid, $type, $size, $self) = @_;
+ my ($bref, $oid, $type, $size, $sync) = @_;
+ return if is_bad_blob($oid, $type, $size, $sync->{oid});
+ my $self = $sync->{sidx};
+ local $self->{current_info} = "$self->{current_info}: $oid";
unindex_eml($self, $oid, PublicInbox::Eml->new($bref));
+ # may be undef if leftover
+ if (defined(my $cur_cmt = $sync->{cur_cmt})) {
+ ${$sync->{latest_cmt}} = $cur_cmt;
+ }
+}
+
+sub with_umask {
+ my $self = shift;
+ ($self->{ibx} // $self->{eidx})->with_umask(@_);
}
# called by public-inbox-index
sub index_sync {
my ($self, $opt) = @_;
delete $self->{lock_path} if $opt->{-skip_lock};
- $self->{ibx}->with_umask(\&_index_sync, $self, $opt);
- if ($opt->{reindex}) {
+ $self->with_umask(\&_index_sync, $self, $opt);
+ if ($opt->{reindex} && !$opt->{quit}) {
my %again = %$opt;
delete @again{qw(rethread reindex)};
index_sync($self, \%again);
+ $opt->{quit} = $again{quit}; # propagate to caller
}
}
my ($self, $sync, $stk) = @_;
$self->{ibx}->git->async_wait_all;
- # latest_cmt may be undef
- my $newest = $stk ? $stk->{latest_cmt} : undef;
- if ($newest) {
+ # $newest may be undef
+ my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ if (defined($newest)) {
my $cur = $self->{mm}->last_commit || '';
if (need_update($self, $cur, $newest)) {
$self->{mm}->last_commit($newest);
}
- } else {
- ${$sync->{max}} = $self->{batch_bytes};
}
+ ${$sync->{max}} = $self->{batch_bytes};
$self->{mm}->{dbh}->commit;
- if ($newest && need_xapian($self)) {
- my $xdb = $self->{xdb};
+ my $xdb = need_xapian($self) ? $self->{xdb} : undef;
+ if ($newest && $xdb) {
my $cur = $xdb->get_metadata('last_commit');
if (need_update($self, $cur, $newest)) {
$xdb->set_metadata('last_commit', $newest);
}
-
+ }
+ if ($stk) { # all done if $stk is passed
# let SearchView know a full --reindex was done so it can
# generate ->has_threadid-dependent links
- if ($sync->{reindex} && !ref($sync->{reindex})) {
+ if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) {
my $n = $xdb->get_metadata('has_threadid');
$xdb->set_metadata('has_threadid', '1') if $n ne '1';
}
+ $self->{oidx}->rethread_done($sync->{-opt}); # all done
}
-
- $self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done
commit_txn_lazy($self);
- $self->{ibx}->git->cleanup;
+ $sync->{ibx}->git->cleanup;
my $nr = ${$sync->{nr}};
idx_release($self, $nr);
# let another process do some work...
if (my $pr = $sync->{-opt}->{-progress}) {
$pr->("indexed $nr/$sync->{ntodo}\n") if $nr;
}
- if (!$stk) { # more to come
+ if (!$stk && !$sync->{quit}) { # more to come
begin_txn_lazy($self);
$self->{mm}->{dbh}->begin_work;
}
# only for v1
sub process_stack {
my ($self, $sync, $stk) = @_;
- my $git = $self->{ibx}->git;
+ my $git = $sync->{ibx}->git;
my $max = $self->{batch_bytes};
my $nr = 0;
$sync->{nr} = \$nr;
$sync->{max} = \$max;
$sync->{sidx} = $self;
+ $sync->{latest_cmt} = \(my $latest_cmt);
$self->{mm}->{dbh}->begin_work;
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
for my $oid (@leftovers) {
+ last if $sync->{quit};
$oid = unpack('H*', $oid);
- $git->cat_async($oid, \&unindex_both, $self);
+ $git->cat_async($oid, \&unindex_both, $sync);
}
}
if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
$sync->{index_oid} = \&index_both;
}
- while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+ while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
+ my $arg = { %$sync, cur_cmt => $cur_cmt, oid => $oid };
+ last if $sync->{quit};
if ($f eq 'm') {
- my $arg = { %$sync, autime => $at, cotime => $ct };
+ $arg->{autime} = $at;
+ $arg->{cotime} = $ct;
if ($sync->{max_size}) {
$git->check_async($oid, \&check_size, $arg);
} else {
}
v1_checkpoint($self, $sync) if $max <= 0;
} elsif ($f eq 'd') {
- $git->cat_async($oid, \&unindex_both, $self);
+ $git->cat_async($oid, \&unindex_both, $arg);
}
}
- v1_checkpoint($self, $sync, $stk);
+ v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
}
sub log2stack ($$$) {
my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
--no-notes --no-color --no-renames --no-abbrev),
$range);
- my ($at, $ct, $stk);
+ my ($at, $ct, $stk, $cmt);
while (<$fh>) {
+ return if $sync->{quit};
if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
- ($at, $ct) = ($1 + 0, $2 + 0);
- $stk //= PublicInbox::IdxStack->new($3);
+ ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
+ $stk //= PublicInbox::IdxStack->new($cmt);
} elsif (/$del/) {
my $oid = $1;
if ($D) { # reindex case
$D->{pack('H*', $oid)}++;
} else { # non-reindex case:
- $stk->push_rec('d', $at, $ct, $oid);
+ $stk->push_rec('d', $at, $ct, $oid, $cmt);
}
} elsif (/$add/) {
my $oid = $1;
my $oid_bin = pack('H*', $oid);
my $nr = --$D->{$oid_bin};
delete($D->{$oid_bin}) if $nr <= 0;
-
# nr < 0 (-1) means it never existed
- $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
- } else {
- $stk->push_rec('m', $at, $ct, $oid);
+ next if $nr >= 0;
}
+ $stk->push_rec('m', $at, $ct, $oid, $cmt);
}
}
close $fh or die "git log failed: \$?=$?";
ref($reindex) eq 'HASH' ? $reindex->{from} : '';
}
+sub quit_cb ($) {
+ my ($sync) = @_;
+ sub {
+ # we set {-opt}->{quit} too, so ->index_sync callers
+ # can abort multi-inbox loops this way
+ $sync->{quit} = $sync->{-opt}->{quit} = 1;
+ warn "gracefully quitting\n";
+ }
+}
+
# indexes all unindexed messages (v1 only)
sub _index_sync {
my ($self, $opt) = @_;
my $tip = $opt->{ref} || 'HEAD';
my $ibx = $self->{ibx};
+ local $self->{current_info} = "$ibx->{inboxdir}";
$self->{batch_bytes} = $opt->{batch_size} // $BATCH_BYTES;
$ibx->git->batch_prepare;
my $pr = $opt->{-progress};
my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
+ my $quit = quit_cb($sync);
+ local $SIG{QUIT} = $quit;
+ local $SIG{INT} = $quit;
+ local $SIG{TERM} = $quit;
my $xdb = $self->begin_txn_lazy;
$self->{oidx}->rethread_prepare($opt);
my $mm = _msgmap_init($self);
my $stk = prepare_stack($sync, $range);
$sync->{ntodo} = $stk ? $stk->num_records : 0;
$pr->("$sync->{ntodo}\n") if $pr; # continue previous line
- process_stack($self, $sync, $stk);
+ process_stack($self, $sync, $stk) if !$sync->{quit};
}
sub DESTROY {
sub begin_txn_lazy {
my ($self) = @_;
- $self->{ibx}->with_umask(\&_begin_txn, $self) if !$self->{txn};
+ $self->with_umask(\&_begin_txn, $self) if !$self->{txn};
}
# store 'indexlevel=medium' in v2 shard=0 and v1 (only one shard)
sub _commit_txn {
my ($self) = @_;
+ if (my $eidx = $self->{eidx}) {
+ $eidx->git->async_wait_all;
+ $eidx->{transact_bytes} = 0;
+ }
if (my $xdb = $self->{xdb}) {
set_metadata_once($self);
$xdb->commit_transaction;
sub commit_txn_lazy {
my ($self) = @_;
delete($self->{txn}) and
- $self->{ibx}->with_umask(\&_commit_txn, $self);
+ $self->with_umask(\&_commit_txn, $self);
}
sub worker_done {
sub eidx_shard_new {
my ($class, $eidx, $shard) = @_;
my $self = bless {
+ eidx => $eidx,
xpfx => $eidx->{xpfx},
indexlevel => $eidx->{indexlevel},
-skip_docdata => 1,
$self;
}
+# ensure there's no stale Xapian docs by treating $over as canonical
+sub over_check {
+ my ($self, $over) = @_;
+ begin_txn_lazy($self);
+ my $sth = $over->dbh->prepare(<<'');
+SELECT COUNT(*) FROM over WHERE num = ?
+
+ my $xdb = $self->{xdb};
+ my $cur = $xdb->postlist_begin('');
+ my $end = $xdb->postlist_end('');
+ my $xdir = $self->xdir;
+ for (; $cur != $end; $cur++) {
+ my $docid = $cur->get_docid;
+ $sth->execute($docid);
+ my $x = $sth->fetchrow_array;
+ next if $x > 0;
+ warn "I: removing $xdir #$docid, not in `over'\n";
+ $xdb->delete_document($docid);
+ }
+}
+
1;