]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
searchidxshard: use PublicInbox::IPC to kill lots of code
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index edb8ba570f0ae2e9a6a127d2e778160a8b68a255..cad559c5bddb0a0c4e71c9c307e2135bea7ad2ec 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2018-2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # This interface wraps and mimics PublicInbox::Import
@@ -16,7 +16,7 @@ use PublicInbox::ContentHash qw(content_hash content_digest);
 use PublicInbox::InboxWritable;
 use PublicInbox::OverIdx;
 use PublicInbox::Msgmap;
-use PublicInbox::Spawn qw(spawn popen_rd);
+use PublicInbox::Spawn qw(spawn popen_rd run_die);
 use PublicInbox::SearchIdx qw(log2stack crlf_adjust is_ancestor check_size
        is_bad_blob);
 use IO::Handle; # ->autoflush
@@ -287,16 +287,7 @@ sub _idx_init { # with_umask callback
 
 sub parallel_init ($$) {
        my ($self, $indexlevel) = @_;
-       if (($indexlevel // 'full') eq 'basic') {
-               $self->{parallel} = 0;
-       } else {
-               pipe(my ($r, $w)) or die "pipe failed: $!";
-               # pipe for barrier notifications doesn't need to be big,
-               # 1031: F_SETPIPE_SZ
-               fcntl($w, 1031, 4096) if $^O eq 'linux';
-               $self->{bnote} = [ $r, $w ];
-               $w->autoflush(1);
-       }
+       $self->{parallel} = 0 if ($indexlevel // 'full') eq 'basic';
 }
 
 # idempotent
@@ -574,24 +565,6 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx
        }
 }
 
-sub barrier_init {
-       my ($self, $n) = @_;
-       $self->{bnote} or return;
-       --$n;
-       my $barrier = { map { $_ => 1 } (0..$n) };
-}
-
-sub barrier_wait {
-       my ($self, $barrier) = @_;
-       my $bnote = $self->{bnote} or return;
-       my $r = $bnote->[0];
-       while (scalar keys %$barrier) {
-               defined(my $l = readline($r)) or die "EOF on barrier_wait: $!";
-               $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l";
-               delete $barrier->{$1} or die "bad shard[$1] on barrier wait";
-       }
-}
-
 # public
 sub checkpoint ($;$) {
        my ($self, $wait) = @_;
@@ -615,16 +588,23 @@ sub checkpoint ($;$) {
                $self->{oidx}->commit_lazy;
 
                # Now deal with Xapian
-               if ($wait) {
-                       my $barrier = barrier_init($self, scalar @$shards);
-
-                       # each shard needs to issue a barrier command
-                       $_->shard_barrier for @$shards;
 
-                       # wait for each Xapian shard
-                       barrier_wait($self, $barrier);
-               } else {
-                       $_->shard_commit for @$shards;
+               # start commit_txn_lazy asynchronously on all parallel shards
+               # (non-parallel waits here)
+               $_->ipc_do('commit_txn_lazy') for @$shards;
+
+               # transactions started on parallel shards,
+               # wait for them by issuing an echo command (echo can only
+               # run after commit_txn_lazy is done)
+               if ($wait && $self->{parallel}) {
+                       my $i = 0;
+                       for my $shard (@$shards) {
+                               my $echo = $shard->ipc_do('echo', $i);
+                               $echo == $i or die <<"";
+shard[$i] bad echo:$echo != $i waiting for txn commit
+
+                               ++$i;
+                       }
                }
 
                my $midx = $self->{midx}; # misc index
@@ -679,7 +659,6 @@ sub done {
        eval { $self->{oidx}->dbh_close };
        $err .= "over close: $@\n" if $@;
        delete $self->{midx};
-       delete $self->{bnote};
        my $nbytes = $self->{total_bytes};
        $self->{total_bytes} = 0;
        $self->lock_release(!!$nbytes) if $shards;
@@ -745,9 +724,8 @@ sub git_init {
        my ($self, $epoch) = @_;
        my $git_dir = "$self->{ibx}->{inboxdir}/git/$epoch.git";
        PublicInbox::Import::init_bare($git_dir);
-       my @cmd = (qw/git config/, "--file=$git_dir/config",
-                       'include.path', '../../all.git/config');
-       PublicInbox::Import::run_die(\@cmd);
+       run_die([qw(git config), "--file=$git_dir/config",
+               qw(include.path ../../all.git/config)]);
        fill_alternates($self, $epoch);
        $git_dir
 }
@@ -847,15 +825,13 @@ sub content_exists ($$$) {
 
 sub atfork_child {
        my ($self) = @_;
-       if (my $shards = $self->{idx_shards}) {
-               $_->atfork_child foreach @$shards;
+       if (my $older_siblings = $self->{idx_shards}) {
+               $_->shard_atfork_child for @$older_siblings;
        }
        if (my $im = $self->{im}) {
                $im->atfork_child;
        }
-       die "unexpected mm" if $self->{mm};
-       close $self->{bnote}->[0] or die "close bnote[0]: $!\n";
-       $self->{bnote}->[1];
+       die "BUG: unexpected mm" if $self->{mm};
 }
 
 sub reindex_checkpoint ($$) {
@@ -1222,8 +1198,7 @@ sub unindex_todo ($$$) {
        return if $before == $after;
 
        # ensure any blob can not longer be accessed via dumb HTTP
-       PublicInbox::Import::run_die(['git',
-               "--git-dir=$unit->{git}->{git_dir}",
+       run_die(['git', "--git-dir=$unit->{git}->{git_dir}",
                qw(-c gc.reflogExpire=now gc --prune=all --quiet)]);
 }