X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FExtSearchIdx.pm;h=c82d163349a755094193fb5916c617aae76fa0e1;hb=0d38f65c490466837ae091afa7a7b6f59d04ce7c;hp=026e137730fd8610aed8043cc65b8dd75e319462;hpb=4c315ed49fe8a6224264d74c490e0ee552365b2f;p=public-inbox.git diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 026e1377..c82d1633 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -19,8 +19,9 @@ use v5.10.1; use parent qw(PublicInbox::ExtSearch PublicInbox::Lock); use Carp qw(croak carp); use PublicInbox::Search; -use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack); +use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor); use PublicInbox::OverIdx; +use PublicInbox::MiscIdx; use PublicInbox::MID qw(mids); use PublicInbox::V2Writable; use PublicInbox::InboxWritable; @@ -30,6 +31,7 @@ use File::Spec; sub new { my (undef, $dir, $opt) = @_; + $dir = File::Spec->canonpath($dir); my $l = $opt->{indexlevel} // 'full'; $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and die "invalid indexlevel=$l\n"; @@ -85,17 +87,10 @@ sub _ibx_attach { # each_inbox callback sub attach_config { my ($self, $cfg) = @_; + $self->{cfg} = $cfg; $cfg->each_inbox(\&_ibx_attach, $self); } -sub git_blob_digest ($) { - my ($bref) = @_; - my $dig = Digest::SHA->new(1); # XXX SHA256 later - $dig->add('blob '.length($$bref)."\0"); - $dig->add($$bref); - $dig; -} - sub is_bad_blob ($$$$) { my ($oid, $type, $size, $expect_oid) = @_; if ($type ne 'blob') { @@ -106,6 +101,18 @@ sub is_bad_blob ($$$$) { $size == 0 ? 1 : 0; # size == 0 means purged } +sub check_batch_limit ($) { + my ($req) = @_; + my $self = $req->{self}; + my $new_smsg = $req->{new_smsg}; + + # {raw_bytes} may be unset, so just use {bytes} + my $n = $self->{transact_bytes} += $new_smsg->{bytes}; + + # set flag for PublicInbox::V2Writable::index_todo: + ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes}; +} + sub do_xpost ($$) { my ($req, $smsg) = @_; my $self = $req->{self}; @@ -114,12 +121,21 @@ sub do_xpost ($$) { my $oid = $req->{oid}; my $xibx = $req->{ibx}; my $eml = $req->{eml}; + my $eidx_key = $xibx->eidx_key; if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message my $xnum = $req->{xnum}; - $self->{oidx}->add_xref3($docid, $xnum, $oid, $xibx->eidx_key); - $idx->shard_add_eidx_info($docid, $oid, $xibx, $eml); + $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key); + $idx->shard_add_eidx_info($docid, $eidx_key, $eml); + check_batch_limit($req); } else { # 'd' - $idx->shard_remove_eidx_info($docid, $oid, $xibx, $eml); + my $rm_eidx_info; + my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key, + \$rm_eidx_info); + if ($nr == 0) { + $idx->shard_remove($docid); + } elsif ($rm_eidx_info) { + $idx->shard_remove_eidx_info($docid, $eidx_key, $eml); + } } } @@ -140,6 +156,7 @@ sub index_unseen ($) { my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset'; $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key); $idx->index_raw(undef, $eml, $new_smsg, $ibx); + check_batch_limit($req); } sub do_finalize ($) { @@ -149,7 +166,12 @@ sub do_finalize ($) { } elsif (exists $req->{new_smsg}) { # totally unseen messsage index_unseen($req); } else { - warn "W: ignoring delete $req->{oid} (not found)\n"; + # `d' message was already unindexed in the v1/v2 inboxes, + # so it's too noisy to warn, here. + } + # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo + if (defined(my $cur_cmt = $req->{cur_cmt})) { + ${$req->{latest_cmt}} = $cur_cmt; } } @@ -176,14 +198,35 @@ sub do_step ($) { # main iterator for adding messages to the index do_finalize($req); } +sub _blob_missing ($) { # called when req->{cur_smsg}->{blob} is bad + my ($req) = @_; + my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing'; + my $self = $req->{self}; + my $xref3 = $self->{oidx}->get_xref3($smsg->{num}); + my @keep = grep(!/:$smsg->{blob}\z/, @$xref3); + if (@keep) { + $keep[0] =~ /:([a-f0-9]{40,}+)\z/ or + die "BUG: xref $keep[0] has no OID"; + my $oidhex = $1; + $self->{oidx}->remove_xref3($smsg->{num}, $smsg->{blob}); + my $upd = $self->{oidx}->update_blob($smsg, $oidhex); + my $saved = $self->{oidx}->get_art($smsg->{num}); + } else { + $self->{oidx}->delete_by_num($smsg->{num}); + } +} + sub ck_existing { # git->cat_async callback my ($bref, $oid, $type, $size, $req) = @_; my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing'; - return if is_bad_blob($oid, $type, $size, $smsg->{blob}); - my $cur = PublicInbox::Eml->new($bref); - if (content_hash($cur) eq $req->{chash}) { - push @{$req->{indexed}}, $smsg; # for do_xpost - } # else { index_unseen later } + if ($type eq 'missing') { + _blob_missing($req); + } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) { + my $cur = PublicInbox::Eml->new($bref); + if (content_hash($cur) eq $req->{chash}) { + push @{$req->{indexed}}, $smsg; # for do_xpost + } # else { index_unseen later } + } do_step($req); } @@ -193,10 +236,6 @@ sub cur_ibx_xnum ($$) { my ($req, $bref) = @_; my $ibx = $req->{ibx} or die 'BUG: current {ibx} missing'; - # XXX overkill? - git_blob_digest($bref)->hexdigest eq $req->{oid} or die - "BUG: blob mismatch $req->{oid}"; - $req->{eml} = PublicInbox::Eml->new($bref); $req->{chash} = content_hash($req->{eml}); $req->{mids} = mids($req->{eml}); @@ -212,17 +251,22 @@ sub cur_ibx_xnum ($$) { sub index_oid { # git->cat_async callback for 'm' my ($bref, $oid, $type, $size, $req) = @_; + my $self = $req->{self}; + local $self->{current_info} = "$self->{current_info} $oid"; return if is_bad_blob($oid, $type, $size, $req->{oid}); my $new_smsg = $req->{new_smsg} = bless { blob => $oid, }, 'PublicInbox::Smsg'; $new_smsg->{bytes} = $size + crlf_adjust($$bref); defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return; + ++${$req->{nr}}; do_step($req); } sub unindex_oid { # git->cat_async callback for 'd' my ($bref, $oid, $type, $size, $req) = @_; + my $self = $req->{self}; + local $self->{current_info} = "$self->{current_info} $oid"; return if is_bad_blob($oid, $type, $size, $req->{oid}); return if defined(cur_ibx_xnum($req, $bref)); # was re-added do_step($req); @@ -241,21 +285,16 @@ sub last_commits { } sub _sync_inbox ($$$) { - my ($self, $opt, $ibx) = @_; - my $sync = { - need_checkpoint => \(my $bool = 0), - reindex => $opt->{reindex}, - -opt => $opt, - self => $self, - ibx => $ibx, - }; + my ($self, $sync, $ibx) = @_; + $sync->{ibx} = $ibx; + $sync->{nr} = \(my $nr = 0); my $v = $ibx->version; my $ekey = $ibx->eidx_key; if ($v == 2) { my $epoch_max; defined($ibx->git_dir_latest(\$epoch_max)) or return; $sync->{epoch_max} = $epoch_max; - sync_prepare($self, $sync) or return; # fills $sync->{todo} + sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable } elsif ($v == 1) { my $uv = $ibx->uidvalidity; my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv"); @@ -266,7 +305,94 @@ sub _sync_inbox ($$$) { warn "E: $ekey unsupported inbox version (v$v)\n"; return; } - index_todo($self, $sync, $_) for @{$sync->{todo}}; + for my $unit (@{delete($sync->{todo}) // []}) { + last if $sync->{quit}; + index_todo($self, $sync, $unit); + } + $self->{midx}->index_ibx($ibx) unless $sync->{quit}; + $ibx->git->cleanup; # done with this inbox, now +} + +sub unref_doc ($$$$) { + my ($self, $ibx_id, $eidx_key, $docid) = @_; + my $dbh = $self->{oidx}->dbh; + + # for debug/info purposes, oids may no longer be accessible + my $sth = $dbh->prepare_cached(<<'', undef, 1); +SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ? + + $sth->execute($docid, $ibx_id); + my @oid = map { unpack('H*', $_->[0]) } @{$sth->fetchall_arrayref}; + + $dbh->prepare_cached(<<'')->execute($docid, $ibx_id); +DELETE FROM xref3 WHERE docid = ? AND ibx_id = ? + + my $remain = $self->{oidx}->get_xref3($docid); + my $idx = $self->idx_shard($docid); + if (@$remain) { + for my $oid (@oid) { + warn "I: unref #$docid $eidx_key $oid\n"; + $idx->shard_remove_eidx_info($docid, $eidx_key); + } + } else { + warn "I: remove #$docid $eidx_key @oid\n"; + $idx->shard_remove($docid); + } +} + +sub eidx_gc { + my ($self, $opt) = @_; + $self->{cfg} or die "E: GC requires ->attach_config\n"; + $opt->{-idx_gc} = 1; + $self->idx_init($opt); # acquire lock via V2Writable::_idx_init + + my $dbh = $self->{oidx}->dbh; + my $x3_doc = $dbh->prepare('SELECT docid FROM xref3 WHERE ibx_id = ?'); + my $ibx_ck = $dbh->prepare('SELECT ibx_id,eidx_key FROM inboxes'); + my $lc_i = $dbh->prepare('SELECT key FROM eidx_meta WHERE key LIKE ?'); + + $ibx_ck->execute; + while (my ($ibx_id, $eidx_key) = $ibx_ck->fetchrow_array) { + next if $self->{ibx_map}->{$eidx_key}; + $self->{midx}->remove_eidx_key($eidx_key); + warn "I: deleting messages for $eidx_key...\n"; + $x3_doc->execute($ibx_id); + while (defined(my $docid = $x3_doc->fetchrow_array)) { + unref_doc($self, $ibx_id, $eidx_key, $docid); + } + $dbh->prepare_cached(<<'')->execute($ibx_id); +DELETE FROM inboxes WHERE ibx_id = ? + + # drop last_commit info + my $pat = $eidx_key; + $pat =~ s/([_%])/\\$1/g; + $lc_i->execute("lc-%:$pat//%"); + while (my ($key) = $lc_i->fetchrow_array) { + next if $key !~ m!\Alc-v[1-9]+:\Q$eidx_key\E//!; + warn "I: removing $key\n"; + $dbh->prepare_cached(<<'')->execute($key); +DELETE FROM eidx_meta WHERE key = ? + + } + + warn "I: $eidx_key removed\n"; + } + + # it's not real unless it's in `over', we use parallelism here, + # shards will be reading directly from over, so commit + $self->{oidx}->commit_lazy; + $self->{oidx}->begin_lazy; + + for my $idx (@{$self->{idx_shards}}) { + warn "I: cleaning up shard #$idx->{shard}\n"; + $idx->shard_over_check($self->{oidx}); + } + my $nr = $dbh->do(<<''); +DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) + + warn "I: eliminated $nr stale xref3 entries\n" if $nr != 0; + + done($self); } sub eidx_sync { # main entry point @@ -274,22 +400,67 @@ sub eidx_sync { # main entry point $self->idx_init($opt); # acquire lock via V2Writable::_idx_init $self->{oidx}->rethread_prepare($opt); - _sync_inbox($self, $opt, $_) for (@{$self->{ibx_list}}); + my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; + local $self->{current_info} = ''; + local $SIG{__WARN__} = sub { + $warn_cb->($self->{current_info}, ': ', @_); + }; + my $sync = { + need_checkpoint => \(my $need_checkpoint = 0), + reindex => $opt->{reindex}, + -opt => $opt, + self => $self, + -regen_fmt => "%u/?\n", + }; + local $SIG{USR1} = sub { $need_checkpoint = 1 }; + my $quit = PublicInbox::SearchIdx::quit_cb($sync); + local $SIG{QUIT} = $quit; + local $SIG{INT} = $quit; + local $SIG{TERM} = $quit; - $self->{oidx}->rethread_done($opt); + # don't use $_ here, it'll get clobbered by reindex_checkpoint + for my $ibx (@{$self->{ibx_list}}) { + last if $sync->{quit}; + _sync_inbox($self, $sync, $ibx); + } + $self->{oidx}->rethread_done($opt) unless $sync->{quit}; PublicInbox::V2Writable::done($self); } -sub update_last_commit { - my ($self, $sync, $unit, $latest_cmt) = @_; +sub update_last_commit { # overrides V2Writable + my ($self, $sync, $stk) = @_; + my $unit = $sync->{unit} // return; + my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}}; + defined($latest_cmt) or return; + my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing'; + my $ekey = $ibx->eidx_key; + my $uv = $ibx->uidvalidity; + my $epoch = $unit->{epoch}; + my $meta_key; + my $v = $ibx->version; + if ($v == 2) { + die 'No {epoch} for v2 unit' unless defined $epoch; + $meta_key = "lc-v2:$ekey//$uv;$epoch"; + } elsif ($v == 1) { + die 'Unexpected {epoch} for v1 unit' if defined $epoch; + $meta_key = "lc-v1:$ekey//$uv"; + } else { + die "Unsupported inbox version: $v"; + } + my $last = $self->{oidx}->eidx_meta($meta_key); + if (defined $last && is_ancestor($self->git, $last, $latest_cmt)) { + my @cmd = (qw(rev-list --count), "$last..$latest_cmt"); + chomp(my $n = $unit->{git}->qx(@cmd)); + return if $n ne '' && $n == 0; + } + $self->{oidx}->eidx_meta($meta_key, $latest_cmt); +} - my $ALL = $self->git; - # while (scalar(@{$ALL->{inflight_c}}) || scalar(@{$ALL->{inflight}})) { - # $ALL->check_async_wait; - # $ALL->cat_async_wait; - # } - # TODO +sub _idx_init { # with_umask callback + my ($self, $opt) = @_; + PublicInbox::V2Writable::_idx_init($self, $opt); + $self->{midx} = PublicInbox::MiscIdx->new($self); } sub idx_init { # similar to V2Writable @@ -303,27 +474,43 @@ sub idx_init { # similar to V2Writable my $info_dir = "$ALL/objects/info"; my $alt = "$info_dir/alternates"; my $mode = 0644; - my (%old, @old, %new, @new); + my (@old, @new, %seen); # seen: st_dev + st_ino if (-e $alt) { open(my $fh, '<', $alt) or die "open $alt: $!"; $mode = (stat($fh))[2] & 07777; - while (<$fh>) { - push @old, $_ if !$old{$_}++; + while (my $line = <$fh>) { + chomp(my $d = $line); + if (my @st = stat($d)) { + next if $seen{"$st[0]\0$st[1]"}++; + } else { + warn "W: stat($d) failed (from $alt): $!\n"; + next if $opt->{-idx_gc}; + } + push @old, $line; } } for my $ibx (@{$self->{ibx_list}}) { my $line = $ibx->git->{git_dir} . "/objects\n"; - next if $old{$line}; - $new{$line} = 1; + chomp(my $d = $line); + if (my @st = stat($d)) { + next if $seen{"$st[0]\0$st[1]"}++; + } else { + warn "W: stat($d) failed (from $ibx->{inboxdir}): $!\n"; + next if $opt->{-idx_gc}; + } push @new, $line; } - push @old, @new; - PublicInbox::V2Writable::write_alternates($info_dir, $mode, \@old); + if (scalar @new) { + push @old, @new; + my $o = \@old; + PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o); + } $self->parallel_init($self->{indexlevel}); $self->umask_prepare; - $self->with_umask(\&PublicInbox::V2Writable::_idx_init, $self, $opt); + $self->with_umask(\&_idx_init, $self, $opt); $self->{oidx}->begin_lazy; $self->{oidx}->eidx_prep; + $self->{midx}->begin_txn; } no warnings 'once';