X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FSearchIdxShard.pm;h=ee176e504298dd5e1b1685c94dd5bcfc53d38f14;hb=95bdac7f09c69036efed537a4d03d5bdd2ae4eb6;hp=15ec6578e67ee9190a246d27e40677ca233f8b2e;hpb=0ffd4a9833da64006d558ef241badfef3c096d1b;p=public-inbox.git diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 15ec6578..ee176e50 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2018 all contributors +# Copyright (C) 2018-2020 all contributors # License: AGPL-3.0+ # used to interface with a single Xapian shard in V2 repos. @@ -7,6 +7,7 @@ package PublicInbox::SearchIdxShard; use strict; use warnings; use base qw(PublicInbox::SearchIdx); +use IO::Handle (); # autoflush sub new { my ($class, $v2writable, $shard) = @_; @@ -19,23 +20,23 @@ sub new { } sub spawn_worker { - my ($self, $v2writable, $shard) = @_; + my ($self, $v2w, $shard) = @_; my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; binmode $r, ':raw'; binmode $w, ':raw'; + $w->autoflush(1); my $pid = fork; defined $pid or die "fork failed: $!\n"; if ($pid == 0) { - my $bnote = $v2writable->atfork_child; - $v2writable = undef; + my $bnote = $v2w->atfork_child; close $w or die "failed to close: $!"; # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here # speeds V2Writable batch imports across 8 cores by nearly 20% fcntl($r, 1031, 1048576) if $^O eq 'linux'; - eval { shard_worker_loop($self, $r, $shard, $bnote) }; + eval { shard_worker_loop($self, $v2w, $r, $shard, $bnote) }; die "worker $shard died: $@\n" if $@; die "unexpected MM $self->{mm}" if $self->{mm}; exit; @@ -45,18 +46,12 @@ sub spawn_worker { close $r or die "failed to close: $!"; } -sub shard_worker_loop ($$$$) { - my ($self, $r, $shard, $bnote) = @_; +sub shard_worker_loop ($$$$$) { + my ($self, $v2w, $r, $shard, $bnote) = @_; $0 = "pi-v2-shard[$shard]"; - my $current_info = ''; - my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; - local $SIG{__WARN__} = sub { - chomp $current_info; - $warn_cb->("[$shard] $current_info: ", @_); - }; $self->begin_txn_lazy; while (my $line = $r->getline) { - $current_info = $line; + $v2w->{current_info} = "[$shard] $line"; if ($line eq "commit\n") { $self->commit_txn_lazy; } elsif ($line eq "close\n") { @@ -90,7 +85,6 @@ sub index_raw { if (my $w = $self->{w}) { print $w "$bytes $artnum $oid $mid0\n", $$msgref or die "failed to write shard $!\n"; - $w->flush or die "failed to flush: $!\n"; } else { $$msgref = undef; $self->begin_txn_lazy; @@ -107,7 +101,6 @@ sub remote_barrier { my ($self) = @_; if (my $w = $self->{w}) { print $w "barrier\n" or die "failed to print: $!"; - $w->flush or die "failed to flush: $!"; } else { $self->commit_txn_lazy; }