]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
searchidxshard: use PublicInbox::IPC to kill lots of code
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index 724fa804356265c4449898f47737351cf624d088..cad559c5bddb0a0c4e71c9c307e2135bea7ad2ec 100644 (file)
@@ -287,16 +287,7 @@ sub _idx_init { # with_umask callback
 
 sub parallel_init ($$) {
        my ($self, $indexlevel) = @_;
-       if (($indexlevel // 'full') eq 'basic') {
-               $self->{parallel} = 0;
-       } else {
-               pipe(my ($r, $w)) or die "pipe failed: $!";
-               # pipe for barrier notifications doesn't need to be big,
-               # 1031: F_SETPIPE_SZ
-               fcntl($w, 1031, 4096) if $^O eq 'linux';
-               $self->{bnote} = [ $r, $w ];
-               $w->autoflush(1);
-       }
+       $self->{parallel} = 0 if ($indexlevel // 'full') eq 'basic';
 }
 
 # idempotent
@@ -574,24 +565,6 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx
        }
 }
 
-sub barrier_init {
-       my ($self, $n) = @_;
-       $self->{bnote} or return;
-       --$n;
-       my $barrier = { map { $_ => 1 } (0..$n) };
-}
-
-sub barrier_wait {
-       my ($self, $barrier) = @_;
-       my $bnote = $self->{bnote} or return;
-       my $r = $bnote->[0];
-       while (scalar keys %$barrier) {
-               defined(my $l = readline($r)) or die "EOF on barrier_wait: $!";
-               $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l";
-               delete $barrier->{$1} or die "bad shard[$1] on barrier wait";
-       }
-}
-
 # public
 sub checkpoint ($;$) {
        my ($self, $wait) = @_;
@@ -615,16 +588,23 @@ sub checkpoint ($;$) {
                $self->{oidx}->commit_lazy;
 
                # Now deal with Xapian
-               if ($wait) {
-                       my $barrier = barrier_init($self, scalar @$shards);
-
-                       # each shard needs to issue a barrier command
-                       $_->shard_barrier for @$shards;
 
-                       # wait for each Xapian shard
-                       barrier_wait($self, $barrier);
-               } else {
-                       $_->shard_commit for @$shards;
+               # start commit_txn_lazy asynchronously on all parallel shards
+               # (non-parallel waits here)
+               $_->ipc_do('commit_txn_lazy') for @$shards;
+
+               # transactions started on parallel shards,
+               # wait for them by issuing an echo command (echo can only
+               # run after commit_txn_lazy is done)
+               if ($wait && $self->{parallel}) {
+                       my $i = 0;
+                       for my $shard (@$shards) {
+                               my $echo = $shard->ipc_do('echo', $i);
+                               $echo == $i or die <<"";
+shard[$i] bad echo:$echo != $i waiting for txn commit
+
+                               ++$i;
+                       }
                }
 
                my $midx = $self->{midx}; # misc index
@@ -679,7 +659,6 @@ sub done {
        eval { $self->{oidx}->dbh_close };
        $err .= "over close: $@\n" if $@;
        delete $self->{midx};
-       delete $self->{bnote};
        my $nbytes = $self->{total_bytes};
        $self->{total_bytes} = 0;
        $self->lock_release(!!$nbytes) if $shards;
@@ -846,15 +825,13 @@ sub content_exists ($$$) {
 
 sub atfork_child {
        my ($self) = @_;
-       if (my $shards = $self->{idx_shards}) {
-               $_->atfork_child foreach @$shards;
+       if (my $older_siblings = $self->{idx_shards}) {
+               $_->shard_atfork_child for @$older_siblings;
        }
        if (my $im = $self->{im}) {
                $im->atfork_child;
        }
-       die "unexpected mm" if $self->{mm};
-       close $self->{bnote}->[0] or die "close bnote[0]: $!\n";
-       $self->{bnote}->[1];
+       die "BUG: unexpected mm" if $self->{mm};
 }
 
 sub reindex_checkpoint ($$) {