X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FSearchIdx.pm;h=a1baa65bd2945dcc8885fc70735a1d60ecd5853b;hb=77eafbd653d2efac546f2c330d8cf5e84bef2712;hp=c57a7e1647c11e61d7f45aa23dbe2fd524b439f0;hpb=de8e1586d732ae6c09a92588a8e4d442aedbff37;p=public-inbox.git diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index c57a7e16..a1baa65b 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -9,10 +9,10 @@ package PublicInbox::SearchIdx; use strict; use v5.10.1; -use parent qw(PublicInbox::Search PublicInbox::Lock); +use parent qw(PublicInbox::Search PublicInbox::Lock Exporter); use PublicInbox::Eml; use PublicInbox::InboxWritable; -use PublicInbox::MID qw(mid_mime mids_for_index mids); +use PublicInbox::MID qw(mids_for_index mids); use PublicInbox::MsgIter; use PublicInbox::IdxStack; use Carp qw(croak); @@ -21,8 +21,10 @@ use PublicInbox::OverIdx; use PublicInbox::Spawn qw(spawn); use PublicInbox::Git qw(git_unquote); use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); +our @EXPORT_OK = qw(crlf_adjust log2stack is_ancestor check_size nodatacow_dir); my $X = \%PublicInbox::Search::X; my ($DB_CREATE_OR_OPEN, $DB_OPEN); +our $DB_NO_SYNC = 0; our $BATCH_BYTES = defined($ENV{XAPIAN_FLUSH_THRESHOLD}) ? 0x7fffffff : 1_000_000; use constant DEBUG => !!$ENV{DEBUG}; @@ -30,8 +32,6 @@ use constant DEBUG => !!$ENV{DEBUG}; my $xapianlevels = qr/\A(?:full|medium)\z/; my $hex = '[a-f0-9]'; my $OID = $hex .'{40,}'; -my $addmsg = qr!^:000000 100644 \S+ ($OID) A\t${hex}{2}/${hex}{38}$!; -my $delmsg = qr!^:100644 000000 ($OID) \S+ D\t${hex}{2}/${hex}{38}$!; sub new { my ($class, $ibx, $creat, $shard) = @_; @@ -67,6 +67,7 @@ sub new { $self->{lock_path} = "$inboxdir/ssoma.lock"; my $dir = $self->xdir; $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3"); + $self->{over}->{-no_sync} = 1 if $ibx->{-no_sync}; $self->{index_max_size} = $ibx->{index_max_size}; } elsif ($version == 2) { defined $shard or die "shard is required for v2\n"; @@ -103,9 +104,18 @@ sub load_xapian_writable () { *sortable_serialise = $xap.'::sortable_serialise'; $DB_CREATE_OR_OPEN = eval($xap.'::DB_CREATE_OR_OPEN()'); $DB_OPEN = eval($xap.'::DB_OPEN()'); + my $ver = (eval($xap.'::major_version()') << 16) | + (eval($xap.'::minor_version()') << 8); + $DB_NO_SYNC = 0x4 if $ver >= 0x10400; 1; } +sub nodatacow_dir ($) { + my ($dir) = @_; + opendir my $dh, $dir or die "opendir($dir): $!\n"; + PublicInbox::Spawn::set_nodatacow(fileno($dh)); +} + sub idx_acquire { my ($self) = @_; my $flag; @@ -121,11 +131,14 @@ sub idx_acquire { # don't create empty Xapian directories if we don't need Xapian my $is_shard = defined($self->{shard}); - if (!$is_shard || ($is_shard && need_xapian($self))) { + if (!-d $dir && (!$is_shard || + ($is_shard && need_xapian($self)))) { File::Path::mkpath($dir); + nodatacow_dir($dir); } } return unless defined $flag; + $flag |= $DB_NO_SYNC if $self->{ibx}->{-no_sync}; my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) }; if ($@) { die "Failed opening $dir: ", $@; @@ -337,8 +350,7 @@ sub index_ids ($$$$) { } sub add_xapian ($$$$) { - my ($self, $mime, $smsg, $mids) = @_; - my $hdr = $mime->header_obj; + my ($self, $eml, $smsg, $mids) = @_; my $doc = $X->{Document}->new; add_val($doc, PublicInbox::Search::TS(), $smsg->{ts}); my @ds = gmtime($smsg->{ds}); @@ -353,10 +365,10 @@ sub add_xapian ($$$$) { $tg->set_document($doc); index_headers($self, $smsg); - msg_iter($mime, \&index_xapian, [ $self, $doc ]); - index_ids($self, $doc, $hdr, $mids); + msg_iter($eml, \&index_xapian, [ $self, $doc ]); + index_ids($self, $doc, $eml, $mids); $smsg->{to} = $smsg->{cc} = ''; # WWW doesn't need these, only NNTP - PublicInbox::OverIdx::parse_references($smsg, $hdr, $mids); + PublicInbox::OverIdx::parse_references($smsg, $eml, $mids); my $data = $smsg->to_doc_data; $doc->set_data($data); if (my $altid = $self->{-altid}) { @@ -377,15 +389,15 @@ sub _msgmap_init ($) { die "BUG: _msgmap_init is only for v1\n" if $self->{ibx_ver} != 1; $self->{mm} //= eval { require PublicInbox::Msgmap; - PublicInbox::Msgmap->new($self->{ibx}->{inboxdir}, 1); + my $rw = $self->{ibx}->{-no_sync} ? 2 : 1; + PublicInbox::Msgmap->new($self->{ibx}->{inboxdir}, $rw); }; } sub add_message { # mime = PublicInbox::Eml or Email::MIME object my ($self, $mime, $smsg, $sync) = @_; - my $hdr = $mime->header_obj; - my $mids = mids_for_index($hdr); + my $mids = mids_for_index($mime); $smsg //= bless { blob => '' }, 'PublicInbox::Smsg'; # test-only compat $smsg->{mid} //= $mids->[0]; # v1 compatibility $smsg->{num} //= do { # v1 @@ -394,7 +406,7 @@ sub add_message { }; # v1 and tests only: - $smsg->populate($hdr, $sync); + $smsg->populate($mime, $sync); $smsg->{bytes} //= length($mime->as_string); eval { @@ -478,6 +490,11 @@ sub unindex_eml { while (my ($num, $nr) = each %tmp) { warn "BUG: $num appears >1 times ($nr) for $oid\n" if $nr != 1; } + if ($nr) { + $self->{mm}->num_delete($_) for (keys %tmp); + } else { # just in case msgmap and over.sqlite3 become desynched: + $self->{mm}->mid_delete($mids->[0]); + } xdb_remove($self, $oid, keys %tmp) if need_xapian($self); } @@ -498,11 +515,6 @@ sub index_mm { } } -sub unindex_mm { - my ($self, $mime) = @_; - $self->{mm}->mid_delete(mid_mime($mime)); -} - # returns the number of bytes to add if given a non-CRLF arg sub crlf_adjust ($) { if (index($_[0], "\r\n") < 0) { @@ -530,9 +542,7 @@ sub index_both { # git->cat_async callback sub unindex_both { # git->cat_async callback my ($bref, $oid, $type, $size, $self) = @_; - my $eml = PublicInbox::Eml->new($bref); - unindex_eml($self, $oid, $eml); - unindex_mm($self, $eml); + unindex_eml($self, $oid, PublicInbox::Eml->new($bref)); } # called by public-inbox-index @@ -547,19 +557,58 @@ sub index_sync { } } -sub too_big ($$) { - my ($self, $oid) = @_; - my $max_size = $self->{index_max_size} or return; - my (undef, undef, $size) = $self->{ibx}->git->check($oid); - die "E: bad $oid in $self->{ibx}->{inboxdir}\n" if !defined($size); - return if $size <= $max_size; - warn "W: skipping $oid ($size > $max_size)\n"; - 1; +sub check_size { # check_async cb for -index --max-size=... + my ($oid, $type, $size, $arg, $git) = @_; + (($type // '') eq 'blob') or die "E: bad $oid in $git->{git_dir}"; + if ($size <= $arg->{index_max_size}) { + $git->cat_async($oid, $arg->{index_oid}, $arg); + } else { + warn "W: skipping $oid ($size > $arg->{index_max_size})\n"; + } +} + +sub v1_checkpoint ($$;$) { + my ($self, $sync, $stk) = @_; + $self->{ibx}->git->check_async_wait; + $self->{ibx}->git->cat_async_wait; + + # latest_cmt may be undef + my $newest = $stk ? $stk->{latest_cmt} : undef; + if ($newest) { + my $cur = $self->{mm}->last_commit || ''; + if (need_update($self, $cur, $newest)) { + $self->{mm}->last_commit($newest); + } + } else { + ${$sync->{max}} = $BATCH_BYTES; + } + + $self->{mm}->{dbh}->commit; + if ($newest && need_xapian($self)) { + my $cur = $self->{xdb}->get_metadata('last_commit'); + if (need_update($self, $cur, $newest)) { + $self->{xdb}->set_metadata('last_commit', $newest); + } + } + + $self->{over}->rethread_done($sync->{-opt}) if $newest; # all done + commit_txn_lazy($self); + $self->{ibx}->git->cleanup; + my $nr = ${$sync->{nr}}; + idx_release($self, $nr); + # let another process do some work... + if (my $pr = $sync->{-opt}->{-progress}) { + $pr->("indexed $nr/$sync->{ntodo}\n") if $nr; + } + if (!$stk) { # more to come + begin_txn_lazy($self); + $self->{mm}->{dbh}->begin_work; + } } # only for v1 sub process_stack { - my ($self, $stk, $sync, $batch_cb) = @_; + my ($self, $sync, $stk) = @_; my $git = $self->{ibx}->git; my $max = $BATCH_BYTES; my $nr = 0; @@ -567,6 +616,7 @@ sub process_stack { $sync->{max} = \$max; $sync->{sidx} = $self; + $self->{mm}->{dbh}->begin_work; if (my @leftovers = keys %{delete($sync->{D}) // {}}) { warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); for my $oid (@leftovers) { @@ -574,36 +624,37 @@ sub process_stack { $git->cat_async($oid, \&unindex_both, $self); } } + if ($sync->{index_max_size} = $self->{ibx}->{index_max_size}) { + $sync->{index_oid} = \&index_both; + } while (my ($f, $at, $ct, $oid) = $stk->pop_rec) { if ($f eq 'm') { - $sync->{autime} = $at; - $sync->{cotime} = $ct; - next if too_big($self, $oid); - $git->cat_async($oid, \&index_both, { %$sync }); - if ($max <= 0) { - $git->cat_async_wait; - $max = $BATCH_BYTES; - $batch_cb->($nr); + my $arg = { %$sync, autime => $at, cotime => $ct }; + if ($sync->{index_max_size}) { + $git->check_async($oid, \&check_size, $arg); + } else { + $git->cat_async($oid, \&index_both, $arg); } + v1_checkpoint($self, $sync) if $max <= 0; } elsif ($f eq 'd') { $git->cat_async($oid, \&unindex_both, $self); } } - $git->cat_async_wait; - $batch_cb->($nr, $stk); + v1_checkpoint($self, $sync, $stk); } -sub prepare_stack ($$$) { - my ($self, $sync, $range) = @_; - my $git = $self->{ibx}->git; - - if (index($range, '..') < 0) { - # don't show annoying git errors to users who run -index - # on empty inboxes - $git->qx(qw(rev-parse -q --verify), "$range^0"); - return PublicInbox::IdxStack->new->read_prepare if $?; +sub log2stack ($$$$) { + my ($sync, $git, $range, $ibx) = @_; + my $D = $sync->{D}; # OID_BIN => NR (if reindexing, undef otherwise) + my ($add, $del); + if ($ibx->version == 1) { + my $path = $hex.'{2}/'.$hex.'{38}'; + $add = qr!\A:000000 100644 \S+ ($OID) A\t$path$!; + $del = qr!\A:100644 000000 ($OID) \S+ D\t$path$!; + } else { + $del = qr!\A:\d{6} 100644 $OID ($OID) [AM]\td$!; + $add = qr!\A:\d{6} 100644 $OID ($OID) [AM]\tm$!; } - my $D = $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR # Count the new files so they can be added newest to oldest # and still have numbers increasing from oldest to newest @@ -615,14 +666,14 @@ sub prepare_stack ($$$) { if (/\A([0-9]+)-([0-9]+)-($OID)$/o) { ($at, $ct) = ($1 + 0, $2 + 0); $stk //= PublicInbox::IdxStack->new($3); - } elsif (/$delmsg/) { + } elsif (/$del/) { my $oid = $1; if ($D) { # reindex case $D->{pack('H*', $oid)}++; } else { # non-reindex case: $stk->push_rec('d', $at, $ct, $oid); } - } elsif (/$addmsg/) { + } elsif (/$add/) { my $oid = $1; if ($D) { my $oid_bin = pack('H*', $oid); @@ -641,6 +692,20 @@ sub prepare_stack ($$$) { $stk->read_prepare; } +sub prepare_stack ($$$) { + my ($self, $sync, $range) = @_; + my $git = $self->{ibx}->git; + + if (index($range, '..') < 0) { + # don't show annoying git errors to users who run -index + # on empty inboxes + $git->qx(qw(rev-parse -q --verify), "$range^0"); + return PublicInbox::IdxStack->new->read_prepare if $?; + } + $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR + log2stack($sync, $git, $range, $self->{ibx}); +} + # --is-ancestor requires git 1.8.0+ sub is_ancestor ($$$) { my ($git, $cur, $tip) = @_; @@ -693,7 +758,7 @@ sub _index_sync { my $git = $self->{ibx}->git; $git->batch_prepare; my $pr = $opts->{-progress}; - my $sync = { reindex => $opts->{reindex} }; + my $sync = { reindex => $opts->{reindex}, -opt => $opts }; my $xdb = $self->begin_txn_lazy; $self->{over}->rethread_prepare($opts); my $mm = _msgmap_init($self); @@ -714,40 +779,7 @@ sub _index_sync { my $stk = prepare_stack($self, $sync, $range); $sync->{ntodo} = $stk ? $stk->num_records : 0; $pr->("$sync->{ntodo}\n") if $pr; # continue previous line - - my $dbh = $mm->{dbh}; - my $batch_cb = sub { - my ($nr, $stk) = @_; - # latest_cmt may be undef - my $newest = $stk ? $stk->{latest_cmt} : undef; - if ($newest) { - my $cur = $mm->last_commit || ''; - if (need_update($self, $cur, $newest)) { - $mm->last_commit($newest); - } - } - $dbh->commit; - if ($newest && need_xapian($self)) { - my $cur = $xdb->get_metadata('last_commit'); - if (need_update($self, $cur, $newest)) { - $xdb->set_metadata('last_commit', $newest); - } - } - - $self->{over}->rethread_done($opts) if $newest; # all done - $self->commit_txn_lazy; - $git->cleanup; - $xdb = idx_release($self, $nr); - # let another process do some work... - $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr; - if (!$stk) { # more to come - $xdb = $self->begin_txn_lazy; - $dbh->begin_work; - } - }; - - $dbh->begin_work; - process_stack($self, $stk, $sync, $batch_cb); + process_stack($self, $sync, $stk); } sub DESTROY {