X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FSearchIdx.pm;h=eb620f44fe990e4c38514d9375fa6328ae30e80d;hb=d73d783ab2cf14ba28ca63723223d8c85a68cdd5;hp=4d2e0da9267c9c7214c8d13f2da8ecaef622b675;hpb=95e35e1f546dfec0294380e958ae3b4f4598ce03;p=public-inbox.git diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 4d2e0da9..eb620f44 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -12,21 +12,20 @@ use v5.10.1; use parent qw(PublicInbox::Search PublicInbox::Lock Exporter); use PublicInbox::Eml; use PublicInbox::InboxWritable; -use PublicInbox::MID qw(mid_mime mids_for_index mids); +use PublicInbox::MID qw(mids_for_index mids); use PublicInbox::MsgIter; use PublicInbox::IdxStack; use Carp qw(croak); use POSIX qw(strftime); use PublicInbox::OverIdx; -use PublicInbox::Spawn qw(spawn); +use PublicInbox::Spawn qw(spawn nodatacow_dir); use PublicInbox::Git qw(git_unquote); use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); -our @EXPORT_OK = qw(too_big crlf_adjust log2stack is_ancestor); +our @EXPORT_OK = qw(crlf_adjust log2stack is_ancestor check_size); my $X = \%PublicInbox::Search::X; my ($DB_CREATE_OR_OPEN, $DB_OPEN); our $DB_NO_SYNC = 0; -our $BATCH_BYTES = defined($ENV{XAPIAN_FLUSH_THRESHOLD}) ? - 0x7fffffff : 1_000_000; +our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff : 1_000_000; use constant DEBUG => !!$ENV{DEBUG}; my $xapianlevels = qr/\A(?:full|medium)\z/; @@ -62,13 +61,16 @@ sub new { }, $class; $self->xpfx_init; $self->{-set_indexlevel_once} = 1 if $indexlevel eq 'medium'; + if ($ibx->{-skip_docdata}) { + $self->{-set_skip_docdata_once} = 1; + $self->{-skip_docdata} = 1; + } $ibx->umask_prepare; if ($version == 1) { $self->{lock_path} = "$inboxdir/ssoma.lock"; my $dir = $self->xdir; - $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3"); - $self->{over}->{-no_sync} = 1 if $ibx->{-no_sync}; - $self->{index_max_size} = $ibx->{index_max_size}; + $self->{oidx} = PublicInbox::OverIdx->new("$dir/over.sqlite3"); + $self->{oidx}->{-no_fsync} = 1 if $ibx->{-no_fsync}; } elsif ($version == 2) { defined $shard or die "shard is required for v2\n"; # shard is a number @@ -125,16 +127,17 @@ sub idx_acquire { # don't create empty Xapian directories if we don't need Xapian my $is_shard = defined($self->{shard}); - if (!$is_shard || ($is_shard && need_xapian($self))) { + if (!-d $dir && (!$is_shard || + ($is_shard && need_xapian($self)))) { File::Path::mkpath($dir); + nodatacow_dir($dir); + $self->{-set_has_threadid_once} = 1; } } return unless defined $flag; - $flag |= $DB_NO_SYNC if $self->{ibx}->{-no_sync}; + $flag |= $DB_NO_SYNC if $self->{ibx}->{-no_fsync}; my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) }; - if ($@) { - die "Failed opening $dir: ", $@; - } + croak "Failed opening $dir: $@" if $@; $self->{xdb} = $xdb; } @@ -342,8 +345,7 @@ sub index_ids ($$$$) { } sub add_xapian ($$$$) { - my ($self, $mime, $smsg, $mids) = @_; - my $hdr = $mime->header_obj; + my ($self, $eml, $smsg, $mids) = @_; my $doc = $X->{Document}->new; add_val($doc, PublicInbox::Search::TS(), $smsg->{ts}); my @ds = gmtime($smsg->{ds}); @@ -353,17 +355,26 @@ sub add_xapian ($$$$) { add_val($doc, PublicInbox::Search::DT(), $dt); add_val($doc, PublicInbox::Search::BYTES(), $smsg->{bytes}); add_val($doc, PublicInbox::Search::UID(), $smsg->{num}); + add_val($doc, PublicInbox::Search::THREADID, $smsg->{tid}); my $tg = term_generator($self); $tg->set_document($doc); index_headers($self, $smsg); - msg_iter($mime, \&index_xapian, [ $self, $doc ]); - index_ids($self, $doc, $hdr, $mids); - $smsg->{to} = $smsg->{cc} = ''; # WWW doesn't need these, only NNTP - PublicInbox::OverIdx::parse_references($smsg, $hdr, $mids); - my $data = $smsg->to_doc_data; - $doc->set_data($data); + msg_iter($eml, \&index_xapian, [ $self, $doc ]); + index_ids($self, $doc, $eml, $mids); + + # by default, we maintain compatibility with v1.5.0 and earlier + # by writing to docdata.glass, users who never exect to downgrade can + # use --skip-docdata + if (!$self->{-skip_docdata}) { + # WWW doesn't need {to} or {cc}, only NNTP + $smsg->{to} = $smsg->{cc} = ''; + PublicInbox::OverIdx::parse_references($smsg, $eml, $mids); + my $data = $smsg->to_doc_data; + $doc->set_data($data); + } + if (my $altid = $self->{-altid}) { foreach my $alt (@$altid) { my $pfx = $alt->{xprefix}; @@ -382,7 +393,7 @@ sub _msgmap_init ($) { die "BUG: _msgmap_init is only for v1\n" if $self->{ibx_ver} != 1; $self->{mm} //= eval { require PublicInbox::Msgmap; - my $rw = $self->{ibx}->{-no_sync} ? 2 : 1; + my $rw = $self->{ibx}->{-no_fsync} ? 2 : 1; PublicInbox::Msgmap->new($self->{ibx}->{inboxdir}, $rw); }; } @@ -390,8 +401,7 @@ sub _msgmap_init ($) { sub add_message { # mime = PublicInbox::Eml or Email::MIME object my ($self, $mime, $smsg, $sync) = @_; - my $hdr = $mime->header_obj; - my $mids = mids_for_index($hdr); + my $mids = mids_for_index($mime); $smsg //= bless { blob => '' }, 'PublicInbox::Smsg'; # test-only compat $smsg->{mid} //= $mids->[0]; # v1 compatibility $smsg->{num} //= do { # v1 @@ -400,7 +410,7 @@ sub add_message { }; # v1 and tests only: - $smsg->populate($hdr, $sync); + $smsg->populate($mime, $sync); $smsg->{bytes} //= length($mime->as_string); eval { @@ -409,8 +419,8 @@ sub add_message { # of the fields which exist in over.sqlite3. We may stop # storing doc_data in Xapian sometime after we get multi-inbox # search working. - if (my $over = $self->{over}) { # v1 only - $over->add_overview($mime, $smsg); + if (my $oidx = $self->{oidx}) { # v1 only + $oidx->add_overview($mime, $smsg); } if (need_xapian($self)) { add_xapian($self, $mime, $smsg, $mids); @@ -447,7 +457,7 @@ sub xdb_remove { sub remove_by_oid { my ($self, $oid, $num) = @_; - die "BUG: remove_by_oid is v2-only\n" if $self->{over}; + die "BUG: remove_by_oid is v2-only\n" if $self->{oidx}; $self->begin_txn_lazy; xdb_remove($self, $oid, $num) if need_xapian($self); } @@ -469,13 +479,9 @@ sub unindex_eml { my $nr = 0; my %tmp; for my $mid (@$mids) { - my @removed = eval { $self->{over}->remove_oid($oid, $mid) }; - if ($@) { - warn "E: failed to remove <$mid> from overview: $@\n"; - } else { - $nr += scalar @removed; - $tmp{$_}++ for @removed; - } + my @removed = $self->{oidx}->remove_oid($oid, $mid); + $nr += scalar @removed; + $tmp{$_}++ for @removed; } if (!$nr) { $mids = join('> <', @$mids); @@ -484,6 +490,11 @@ sub unindex_eml { while (my ($num, $nr) = each %tmp) { warn "BUG: $num appears >1 times ($nr) for $oid\n" if $nr != 1; } + if ($nr) { + $self->{mm}->num_delete($_) for (keys %tmp); + } else { # just in case msgmap and over.sqlite3 become desynched: + $self->{mm}->mid_delete($mids->[0]); + } xdb_remove($self, $oid, keys %tmp) if need_xapian($self); } @@ -492,9 +503,9 @@ sub index_mm { my $mids = mids($mime); my $mm = $self->{mm}; if ($sync->{reindex}) { - my $over = $self->{over}; + my $oidx = $self->{oidx}; for my $mid (@$mids) { - my ($num, undef) = $over->num_mid0_for_oid($oid, $mid); + my ($num, undef) = $oidx->num_mid0_for_oid($oid, $mid); return $num if defined $num; } $mm->num_for($mids->[0]) // $mm->mid_insert($mids->[0]); @@ -504,11 +515,6 @@ sub index_mm { } } -sub unindex_mm { - my ($self, $mime) = @_; - $self->{mm}->mid_delete(mid_mime($mime)); -} - # returns the number of bytes to add if given a non-CRLF arg sub crlf_adjust ($) { if (index($_[0], "\r\n") < 0) { @@ -536,43 +542,89 @@ sub index_both { # git->cat_async callback sub unindex_both { # git->cat_async callback my ($bref, $oid, $type, $size, $self) = @_; - my $eml = PublicInbox::Eml->new($bref); - unindex_eml($self, $oid, $eml); - unindex_mm($self, $eml); + unindex_eml($self, $oid, PublicInbox::Eml->new($bref)); } # called by public-inbox-index sub index_sync { - my ($self, $opts) = @_; - delete $self->{lock_path} if $opts->{-skip_lock}; - $self->{ibx}->with_umask(\&_index_sync, $self, $opts); - if ($opts->{reindex}) { - my %again = %$opts; + my ($self, $opt) = @_; + delete $self->{lock_path} if $opt->{-skip_lock}; + $self->{ibx}->with_umask(\&_index_sync, $self, $opt); + if ($opt->{reindex}) { + my %again = %$opt; delete @again{qw(rethread reindex)}; index_sync($self, \%again); } } -sub too_big ($$) { - my ($self, $oid) = @_; - my $max_size = $self->{index_max_size} or return; - my (undef, undef, $size) = $self->{ibx}->git->check($oid); - die "E: bad $oid in $self->{ibx}->{inboxdir}\n" if !defined($size); - return if $size <= $max_size; - warn "W: skipping $oid ($size > $max_size)\n"; - 1; +sub check_size { # check_async cb for -index --max-size=... + my ($oid, $type, $size, $arg, $git) = @_; + (($type // '') eq 'blob') or die "E: bad $oid in $git->{git_dir}"; + if ($size <= $arg->{max_size}) { + $git->cat_async($oid, $arg->{index_oid}, $arg); + } else { + warn "W: skipping $oid ($size > $arg->{max_size})\n"; + } +} + +sub v1_checkpoint ($$;$) { + my ($self, $sync, $stk) = @_; + $self->{ibx}->git->check_async_wait; + $self->{ibx}->git->cat_async_wait; + + # latest_cmt may be undef + my $newest = $stk ? $stk->{latest_cmt} : undef; + if ($newest) { + my $cur = $self->{mm}->last_commit || ''; + if (need_update($self, $cur, $newest)) { + $self->{mm}->last_commit($newest); + } + } else { + ${$sync->{max}} = $self->{batch_bytes}; + } + + $self->{mm}->{dbh}->commit; + if ($newest && need_xapian($self)) { + my $xdb = $self->{xdb}; + my $cur = $xdb->get_metadata('last_commit'); + if (need_update($self, $cur, $newest)) { + $xdb->set_metadata('last_commit', $newest); + } + + # let SearchView know a full --reindex was done so it can + # generate ->has_threadid-dependent links + if ($sync->{reindex} && !ref($sync->{reindex})) { + my $n = $xdb->get_metadata('has_threadid'); + $xdb->set_metadata('has_threadid', '1') if $n ne '1'; + } + } + + $self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done + commit_txn_lazy($self); + $self->{ibx}->git->cleanup; + my $nr = ${$sync->{nr}}; + idx_release($self, $nr); + # let another process do some work... + if (my $pr = $sync->{-opt}->{-progress}) { + $pr->("indexed $nr/$sync->{ntodo}\n") if $nr; + } + if (!$stk) { # more to come + begin_txn_lazy($self); + $self->{mm}->{dbh}->begin_work; + } } # only for v1 sub process_stack { - my ($self, $stk, $sync, $batch_cb) = @_; + my ($self, $sync, $stk) = @_; my $git = $self->{ibx}->git; - my $max = $BATCH_BYTES; + my $max = $self->{batch_bytes}; my $nr = 0; $sync->{nr} = \$nr; $sync->{max} = \$max; $sync->{sidx} = $self; + $self->{mm}->{dbh}->begin_work; if (my @leftovers = keys %{delete($sync->{D}) // {}}) { warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); for my $oid (@leftovers) { @@ -580,23 +632,23 @@ sub process_stack { $git->cat_async($oid, \&unindex_both, $self); } } + if ($sync->{max_size} = $sync->{-opt}->{max_size}) { + $sync->{index_oid} = \&index_both; + } while (my ($f, $at, $ct, $oid) = $stk->pop_rec) { if ($f eq 'm') { - $sync->{autime} = $at; - $sync->{cotime} = $ct; - next if too_big($self, $oid); - $git->cat_async($oid, \&index_both, { %$sync }); - if ($max <= 0) { - $git->cat_async_wait; - $max = $BATCH_BYTES; - $batch_cb->($nr); + my $arg = { %$sync, autime => $at, cotime => $ct }; + if ($sync->{max_size}) { + $git->check_async($oid, \&check_size, $arg); + } else { + $git->cat_async($oid, \&index_both, $arg); } + v1_checkpoint($self, $sync) if $max <= 0; } elsif ($f eq 'd') { $git->cat_async($oid, \&unindex_both, $self); } } - $git->cat_async_wait; - $batch_cb->($nr, $stk); + v1_checkpoint($self, $sync, $stk); } sub log2stack ($$$$) { @@ -709,14 +761,15 @@ sub reindex_from ($$) { # indexes all unindexed messages (v1 only) sub _index_sync { - my ($self, $opts) = @_; - my $tip = $opts->{ref} || 'HEAD'; + my ($self, $opt) = @_; + my $tip = $opt->{ref} || 'HEAD'; my $git = $self->{ibx}->git; + $self->{batch_bytes} = $opt->{batch_size} // $BATCH_BYTES; $git->batch_prepare; - my $pr = $opts->{-progress}; - my $sync = { reindex => $opts->{reindex} }; + my $pr = $opt->{-progress}; + my $sync = { reindex => $opt->{reindex}, -opt => $opt }; my $xdb = $self->begin_txn_lazy; - $self->{over}->rethread_prepare($opts); + $self->{oidx}->rethread_prepare($opt); my $mm = _msgmap_init($self); if ($sync->{reindex}) { my $last = $mm->last_commit; @@ -735,40 +788,7 @@ sub _index_sync { my $stk = prepare_stack($self, $sync, $range); $sync->{ntodo} = $stk ? $stk->num_records : 0; $pr->("$sync->{ntodo}\n") if $pr; # continue previous line - - my $dbh = $mm->{dbh}; - my $batch_cb = sub { - my ($nr, $stk) = @_; - # latest_cmt may be undef - my $newest = $stk ? $stk->{latest_cmt} : undef; - if ($newest) { - my $cur = $mm->last_commit || ''; - if (need_update($self, $cur, $newest)) { - $mm->last_commit($newest); - } - } - $dbh->commit; - if ($newest && need_xapian($self)) { - my $cur = $xdb->get_metadata('last_commit'); - if (need_update($self, $cur, $newest)) { - $xdb->set_metadata('last_commit', $newest); - } - } - - $self->{over}->rethread_done($opts) if $newest; # all done - $self->commit_txn_lazy; - $git->cleanup; - $xdb = idx_release($self, $nr); - # let another process do some work... - $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr; - if (!$stk) { # more to come - $xdb = $self->begin_txn_lazy; - $dbh->begin_work; - } - }; - - $dbh->begin_work; - process_stack($self, $stk, $sync, $batch_cb); + process_stack($self, $sync, $stk); } sub DESTROY { @@ -777,44 +797,10 @@ sub DESTROY { $_[0]->{lockfh} = undef; } -# remote_* subs are only used by SearchIdxPart -sub remote_commit { - my ($self) = @_; - if (my $w = $self->{w}) { - print $w "commit\n" or die "failed to write commit: $!"; - } else { - $self->commit_txn_lazy; - } -} - -sub remote_close { - my ($self) = @_; - if (my $w = delete $self->{w}) { - my $pid = delete $self->{pid} or die "no process to wait on\n"; - print $w "close\n" or die "failed to write to pid:$pid: $!\n"; - close $w or die "failed to close pipe for pid:$pid: $!\n"; - waitpid($pid, 0) == $pid or die "remote process did not finish"; - $? == 0 or die ref($self)." pid:$pid exited with: $?"; - } else { - die "transaction in progress $self\n" if $self->{txn}; - idx_release($self) if $self->{xdb}; - } -} - -sub remote_remove { - my ($self, $oid, $num) = @_; - if (my $w = $self->{w}) { - # triggers remove_by_oid in a shard - print $w "D $oid $num\n" or die "failed to write remove $!"; - } else { - $self->remove_by_oid($oid, $num); - } -} - sub _begin_txn { my ($self) = @_; my $xdb = $self->{xdb} || idx_acquire($self); - $self->{over}->begin_lazy if $self->{over}; + $self->{oidx}->begin_lazy if $self->{oidx}; $xdb->begin_transaction if $xdb; $self->{txn} = 1; $xdb; @@ -827,26 +813,34 @@ sub begin_txn_lazy { # store 'indexlevel=medium' in v2 shard=0 and v1 (only one shard) # This metadata is read by Admin::detect_indexlevel: -sub set_indexlevel { +sub set_metadata_once { my ($self) = @_; - if (!$self->{shard} && # undef or 0, not >0 - delete($self->{-set_indexlevel_once})) { - my $xdb = $self->{xdb}; + return if $self->{shard}; # only continue if undef or 0, not >0 + my $xdb = $self->{xdb}; + + if (delete($self->{-set_has_threadid_once})) { + $xdb->set_metadata('has_threadid', '1'); + } + if (delete($self->{-set_indexlevel_once})) { my $level = $xdb->get_metadata('indexlevel'); if (!$level || $level ne 'medium') { $xdb->set_metadata('indexlevel', 'medium'); } } + if (delete($self->{-set_skip_docdata_once})) { + $xdb->get_metadata('skip_docdata') or + $xdb->set_metadata('skip_docdata', '1'); + } } sub _commit_txn { my ($self) = @_; if (my $xdb = $self->{xdb}) { - set_indexlevel($self); + set_metadata_once($self); $xdb->commit_transaction; } - $self->{over}->commit_lazy if $self->{over}; + $self->{oidx}->commit_lazy if $self->{oidx}; } sub commit_txn_lazy {