X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=cad559c5bddb0a0c4e71c9c307e2135bea7ad2ec;hb=bf8df8160076d7a1231e0858ca4c243df3b7c860;hp=724fa804356265c4449898f47737351cf624d088;hpb=5fd4a152ee0d43227582783a0641dda4e56efe91;p=public-inbox.git diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 724fa804..cad559c5 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -287,16 +287,7 @@ sub _idx_init { # with_umask callback sub parallel_init ($$) { my ($self, $indexlevel) = @_; - if (($indexlevel // 'full') eq 'basic') { - $self->{parallel} = 0; - } else { - pipe(my ($r, $w)) or die "pipe failed: $!"; - # pipe for barrier notifications doesn't need to be big, - # 1031: F_SETPIPE_SZ - fcntl($w, 1031, 4096) if $^O eq 'linux'; - $self->{bnote} = [ $r, $w ]; - $w->autoflush(1); - } + $self->{parallel} = 0 if ($indexlevel // 'full') eq 'basic'; } # idempotent @@ -574,24 +565,6 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx } } -sub barrier_init { - my ($self, $n) = @_; - $self->{bnote} or return; - --$n; - my $barrier = { map { $_ => 1 } (0..$n) }; -} - -sub barrier_wait { - my ($self, $barrier) = @_; - my $bnote = $self->{bnote} or return; - my $r = $bnote->[0]; - while (scalar keys %$barrier) { - defined(my $l = readline($r)) or die "EOF on barrier_wait: $!"; - $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l"; - delete $barrier->{$1} or die "bad shard[$1] on barrier wait"; - } -} - # public sub checkpoint ($;$) { my ($self, $wait) = @_; @@ -615,16 +588,23 @@ sub checkpoint ($;$) { $self->{oidx}->commit_lazy; # Now deal with Xapian - if ($wait) { - my $barrier = barrier_init($self, scalar @$shards); - - # each shard needs to issue a barrier command - $_->shard_barrier for @$shards; - # wait for each Xapian shard - barrier_wait($self, $barrier); - } else { - $_->shard_commit for @$shards; + # start commit_txn_lazy asynchronously on all parallel shards + # (non-parallel waits here) + $_->ipc_do('commit_txn_lazy') for @$shards; + + # transactions started on parallel shards, + # wait for them by issuing an echo command (echo can only + # run after commit_txn_lazy is done) + if ($wait && $self->{parallel}) { + my $i = 0; + for my $shard (@$shards) { + my $echo = $shard->ipc_do('echo', $i); + $echo == $i or die <<""; +shard[$i] bad echo:$echo != $i waiting for txn commit + + ++$i; + } } my $midx = $self->{midx}; # misc index @@ -679,7 +659,6 @@ sub done { eval { $self->{oidx}->dbh_close }; $err .= "over close: $@\n" if $@; delete $self->{midx}; - delete $self->{bnote}; my $nbytes = $self->{total_bytes}; $self->{total_bytes} = 0; $self->lock_release(!!$nbytes) if $shards; @@ -846,15 +825,13 @@ sub content_exists ($$$) { sub atfork_child { my ($self) = @_; - if (my $shards = $self->{idx_shards}) { - $_->atfork_child foreach @$shards; + if (my $older_siblings = $self->{idx_shards}) { + $_->shard_atfork_child for @$older_siblings; } if (my $im = $self->{im}) { $im->atfork_child; } - die "unexpected mm" if $self->{mm}; - close $self->{bnote}->[0] or die "close bnote[0]: $!\n"; - $self->{bnote}->[1]; + die "BUG: unexpected mm" if $self->{mm}; } sub reindex_checkpoint ($$) {