]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/ExtSearchIdx.pm
ds: improve add_timer usability
[public-inbox.git] / lib / PublicInbox / ExtSearchIdx.pm
index 07e646985a4715d273b2f7b7f80f2c0374179d7b..a4b3bbd5e57c3255920e89e21b5e95e5a3d6116d 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Detached/external index cross inbox search indexing support
@@ -21,8 +21,7 @@ use Carp qw(croak carp);
 use Sys::Hostname qw(hostname);
 use POSIX qw(strftime);
 use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor
-       is_bad_blob);
+use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
 use PublicInbox::OverIdx;
 use PublicInbox::MiscIdx;
 use PublicInbox::MID qw(mids);
@@ -30,7 +29,7 @@ use PublicInbox::V2Writable;
 use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
-use PublicInbox::DS qw(now);
+use PublicInbox::DS qw(now add_timer);
 use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
@@ -82,8 +81,6 @@ sub check_batch_limit ($) {
        my ($req) = @_;
        my $self = $req->{self};
        my $new_smsg = $req->{new_smsg};
-
-       # {raw_bytes} may be unset, so just use {bytes}
        my $n = $self->{transact_bytes} += $new_smsg->{bytes};
 
        # set flag for PublicInbox::V2Writable::index_todo:
@@ -102,7 +99,7 @@ sub do_xpost ($$) {
        if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message
                my $xnum = $req->{xnum};
                $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key);
-               $idx->shard_add_eidx_info($docid, $eidx_key, $eml);
+               $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
                check_batch_limit($req);
        } else { # 'd'
                my $rm_eidx_info;
@@ -110,9 +107,10 @@ sub do_xpost ($$) {
                                                        \$rm_eidx_info);
                if ($nr == 0) {
                        $self->{oidx}->eidxq_del($docid);
-                       $idx->shard_remove($docid);
+                       $idx->ipc_do('xdb_remove', $docid);
                } elsif ($rm_eidx_info) {
-                       $idx->shard_remove_eidx_info($docid, $eidx_key, $eml);
+                       $idx->ipc_do('remove_eidx_info',
+                                       $docid, $eidx_key, $eml);
                        $self->{oidx}->eidxq_add($docid); # yes, add
                }
        }
@@ -134,7 +132,7 @@ sub index_unseen ($) {
        my $oid = $new_smsg->{blob};
        my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
        $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
-       $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
+       $idx->index_eml($eml, $new_smsg, $ibx->eidx_key);
        check_batch_limit($req);
 }
 
@@ -238,7 +236,7 @@ sub index_oid { # git->cat_async callback for 'm'
        my $new_smsg = $req->{new_smsg} = bless {
                blob => $oid,
        }, 'PublicInbox::Smsg';
-       $new_smsg->{bytes} = $size + crlf_adjust($$bref);
+       $new_smsg->set_bytes($$bref, $size);
        defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return;
        ++${$req->{nr}};
        do_step($req);
@@ -327,7 +325,7 @@ DELETE FROM xref3 WHERE docid = ? AND ibx_id = ?
                }
        } else {
                warn "I: remove #$docid $eidx_key @oid\n";
-               $self->idx_shard($docid)->shard_remove($docid);
+               $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
        }
 }
 
@@ -436,11 +434,11 @@ sub _reindex_finalize ($$$) {
        my $top_smsg = pop @$stable;
        $top_smsg == $smsg or die 'BUG: top_smsg != smsg';
        my $ibx = _ibx_for($self, $sync, $smsg);
-       $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+       $idx->index_eml($eml, $smsg, $ibx->eidx_key);
        for my $x (reverse @$stable) {
                $ibx = _ibx_for($self, $sync, $x);
                my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
-               $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
+               $idx->ipc_do('add_eidx_info', $docid, $ibx->eidx_key, $hdr);
        }
        return if $nr == 1; # likely, all good
 
@@ -483,19 +481,19 @@ sub _reindex_oid { # git->cat_async callback
                my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
                if ($remain == 0) {
                        warn "W: #$docid gone or corrupted\n";
-                       $self->idx_shard($docid)->shard_remove($docid);
+                       $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
                } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                        $self->git->cat_async($next_oid, \&_reindex_oid, $req);
                } else {
                        warn "BUG: #$docid gone (UNEXPECTED)\n";
-                       $self->idx_shard($docid)->shard_remove($docid);
+                       $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
                }
                return;
        }
        my $ci = $self->{current_info};
        local $self->{current_info} = "$ci #$docid $oid";
        my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
-       $re_smsg->{bytes} = $size + crlf_adjust($$bref);
+       $re_smsg->set_bytes($$bref, $size);
        my $eml = PublicInbox::Eml->new($bref);
        $re_smsg->populate($eml, { autime => $orig_smsg->{ds},
                                cotime => $orig_smsg->{ts} });
@@ -522,7 +520,7 @@ sub _reindex_smsg ($$$) {
 BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 
                $self->{oidx}->delete_by_num($docid);
-               $self->idx_shard($docid)->shard_remove($docid);
+               $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
                return;
        }
 
@@ -675,7 +673,7 @@ sub _reindex_unseen { # git->cat_async callback
        my $self = $req->{self} // die 'BUG: {self} unset';
        local $self->{current_info} = "$self->{current_info} $oid";
        my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
-       $new_smsg->{bytes} = $size + crlf_adjust($$bref);
+       $new_smsg->set_bytes($$bref, $size);
        my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
        $req->{new_smsg} = $new_smsg;
        $req->{chash} = content_hash($eml);
@@ -799,10 +797,10 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
                        if (scalar(@$xr3) == 0) { # all gone
                                $self->{oidx}->delete_by_num($docid);
                                $self->{oidx}->eidxq_del($docid);
-                               $idx->shard_remove($docid);
+                               $idx->ipc_do('xdb_remove', $docid);
                        } else { # enqueue for reindex of remaining messages
-                               $idx->shard_remove_eidx_info($docid,
-                                                       $ibx->eidx_key);
+                               $idx->ipc_do('remove_eidx_info',
+                                               $docid, $ibx->eidx_key);
                                $self->{oidx}->eidxq_add($docid); # yes, add
                        }
                }
@@ -935,18 +933,31 @@ sub idx_init { # similar to V2Writable
        return if $self->{idx_shards};
 
        $self->git->cleanup;
-
+       my $mode = 0644;
        my $ALL = $self->git->{git_dir}; # ALL.git
-       PublicInbox::Import::init_bare($ALL) unless -d $ALL;
+       my $old = -d $ALL;
+       if ($opt->{-private}) { # LeiStore
+               $mode = 0600;
+               if (!$old) {
+                       umask 077; # don't bother restoring
+                       PublicInbox::Import::init_bare($ALL);
+                       $self->git->qx(qw(config core.sharedRepository 0600));
+               }
+       } else {
+               PublicInbox::Import::init_bare($ALL) unless $old;
+       }
        my $info_dir = "$ALL/objects/info";
        my $alt = "$info_dir/alternates";
-       my $mode = 0644;
        my (@old, @new, %seen); # seen: st_dev + st_ino
        if (-e $alt) {
                open(my $fh, '<', $alt) or die "open $alt: $!";
                $mode = (stat($fh))[2] & 07777;
                while (my $line = <$fh>) {
                        chomp(my $d = $line);
+
+                       # expand relative path (/local/ stuff)
+                       substr($d, 0, 3) eq '../' and
+                               $d = "$ALL/objects/$d";
                        if (my @st = stat($d)) {
                                next if $seen{"$st[0]\0$st[1]"}++;
                        } else {
@@ -956,6 +967,22 @@ sub idx_init { # similar to V2Writable
                        push @old, $line;
                }
        }
+
+       # for LeiStore, and possibly some mirror-only state
+       if (opendir(my $dh, my $local = "$self->{topdir}/local")) {
+               # highest numbered epoch first
+               for my $n (sort { $b <=> $a } map { substr($_, 0, -4) + 0 }
+                               grep(/\A[0-9]+\.git\z/, readdir($dh))) {
+                       my $d = "$local/$n.git/objects"; # absolute path
+                       if (my @st = stat($d)) {
+                               next if $seen{"$st[0]\0$st[1]"}++;
+                               # favor relative paths for rename-friendliness
+                               push @new, "../../local/$n.git/objects\n";
+                       } else {
+                               warn "W: stat($d) failed: $!\n";
+                       }
+               }
+       }
        for my $ibx (@{$self->{ibx_list}}) {
                my $line = $ibx->git->{git_dir} . "/objects\n";
                chomp(my $d = $line);
@@ -976,7 +1003,7 @@ sub idx_init { # similar to V2Writable
        $self->with_umask(\&_idx_init, $self, $opt);
        $self->{oidx}->begin_lazy;
        $self->{oidx}->eidx_prep;
-       $self->{midx}->begin_txn;
+       $self->{midx}->create_xdb if @new;
 }
 
 sub _watch_commit { # PublicInbox::DS::add_timer callback
@@ -1000,8 +1027,7 @@ sub on_inbox_unlock { # called by PublicInbox::InboxIdle
        $pr->("indexing $ekey\n") if $pr;
        $self->idx_init($opt);
        sync_inbox($self, $self->{-watch_sync}, $ibx);
-       $self->{-commit_timer} //= PublicInbox::DS::add_timer(
-                                       $opt->{'commit-interval'} // 10,
+       $self->{-commit_timer} //= add_timer($opt->{'commit-interval'} // 10,
                                        \&_watch_commit, $self);
 }
 
@@ -1063,7 +1089,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
        $pr->("performing initial scan ...\n") if $pr;
        my $sync = eidx_sync($self, $opt); # initial sync
        return if $sync->{quit};
-       my $oldset = PublicInbox::Sigfd::block_signals();
+       my $oldset = PublicInbox::DS::block_signals();
        local $self->{current_info} = '';
        my $cb = $SIG{__WARN__} || \&CORE::warn;
        local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
@@ -1081,7 +1107,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
        if (!$sigfd) {
                # wake up every second to accept signals if we don't
                # have signalfd or IO::KQueue:
-               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS::sig_setmask($oldset);
                PublicInbox::DS->SetLoopTimeout(1000);
        }
        PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });