From: Eric Wong (Contractor, The Linux Foundation) Date: Mon, 19 Mar 2018 08:14:34 +0000 (+0000) Subject: v2writable: support "barrier" operation to avoid reforking X-Git-Tag: v1.1.0-pre1~168 X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=commitdiff_plain;h=ad281240644ee370c75d95b35ad7833863257286 v2writable: support "barrier" operation to avoid reforking Stopping and starting a bunch of processes to look up duplicates or removals is inefficient. Take advantage of checkpointing in "git fast-import" and transactions in Xapian and SQLite. --- diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 664bec69..8406c9e2 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -133,7 +133,6 @@ sub check_remove_v1 { (undef, $cur); } -# used for v2 (maybe) sub checkpoint { my ($self) = @_; return unless $self->{pid}; @@ -141,6 +140,15 @@ sub checkpoint { undef; } +sub progress { + my ($self, $msg) = @_; + return unless $self->{pid}; + print { $self->{out} } "progress $msg\n" or wfail; + $self->{in}->getline eq "progress $msg\n" or die + "progress $msg not received\n"; + undef; +} + # used for v2 sub get_mark { my ($self, $mark) = @_; diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm index 6d8cb2a7..dd7ace67 100644 --- a/lib/PublicInbox/SearchIdxPart.pm +++ b/lib/PublicInbox/SearchIdxPart.pm @@ -49,6 +49,11 @@ sub partition_worker_loop ($$$) { } elsif ($line eq "close\n") { $self->_xdb_release; $xdb = $txn = undef; + } elsif ($line eq "barrier\n") { + $xdb->commit_transaction if $txn; + $txn = undef; + print { $self->{skeleton}->{w} } "barrier $part\n" or + die "write failed to skeleton: $!\n"; } else { chomp $line; my ($len, $artnum, $oid, $mid0) = split(/ /, $line); @@ -81,4 +86,11 @@ sub atfork_child { close $_[0]->{w} or die "failed to close write pipe: $!\n"; } +# called by V2Writable: +sub barrier { + my $w = $_[0]->{w}; + print $w "barrier\n" or die "failed to print: $!"; + $w->flush or die "failed to flush: $!"; +} + 1; diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm index 40b28c51..4cb10f59 100644 --- a/lib/PublicInbox/SearchIdxSkeleton.pm +++ b/lib/PublicInbox/SearchIdxSkeleton.pm @@ -15,21 +15,25 @@ sub new { my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; - binmode $r, ':raw'; - binmode $w, ':raw'; + my ($barrier_wait, $barrier_note); + pipe($barrier_wait, $barrier_note) or die "pipe failed: $!\n"; + binmode $_, ':raw' foreach ($r, $w, $barrier_wait, $barrier_note); my $pid = fork; defined $pid or die "fork failed: $!\n"; if ($pid == 0) { $v2writable->atfork_child; $v2writable = undef; close $w; - eval { skeleton_worker_loop($self, $r) }; + close $barrier_wait; + eval { skeleton_worker_loop($self, $r, $barrier_note) }; die "skeleton worker died: $@\n" if $@; exit; } $self->{w} = $w; $self->{pid} = $pid; close $r; + close $barrier_note; + $self->{barrier_wait} = $barrier_wait; $w->autoflush(1); @@ -40,11 +44,13 @@ sub new { } sub skeleton_worker_loop { - my ($self, $r) = @_; + my ($self, $r, $barrier_note) = @_; + $barrier_note->autoflush(1); $0 = 'pi-v2-skeleton'; my $xdb = $self->_xdb_acquire; $xdb->begin_transaction; my $txn = 1; + my $barrier = undef; while (my $line = $r->getline) { if ($line eq "commit\n") { $xdb->commit_transaction if $txn; @@ -52,6 +58,21 @@ sub skeleton_worker_loop { } elsif ($line eq "close\n") { $self->_xdb_release; $xdb = $txn = undef; + } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) { + my $n = $1 - 1; + die "barrier in-progress\n" if defined $barrier; + $barrier = { map { $_ => 1 } (0..$n) }; + } elsif ($line =~ /\Abarrier (\d+)\n\z/) { + my $part = $1; + die "no barrier in-progress\n" unless defined $barrier; + delete $barrier->{$1} or die "unknown barrier: $part\n"; + if ((scalar keys %$barrier) == 0) { + $barrier = undef; + $xdb->commit_transaction if $txn; + $txn = undef; + print $barrier_note "barrier_done\n" or die + "print failed to barrier note: $!"; + } } else { my $len = int($line); my $n = read($r, my $msg, $len) or die "read: $!\n"; @@ -107,4 +128,18 @@ sub index_skeleton_real ($$) { $self->link_and_save($doc, $mids, \@refs, $num, $xpath); } +# write to the subprocess +sub barrier_init { + my ($self, $nparts) = @_; + my $w = $_[0]->{w}; + print $w "barrier_init $nparts\n" or die "failed to write: $!"; + $w->flush or die "failed to flush: $!"; +} + +sub barrier_wait { + my ($self) = @_; + my $l = $self->{barrier_wait}->getline; + $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n"; +} + 1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 7728b91a..6e2a8d68 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -113,7 +113,7 @@ sub num_for { }; # crap, Message-ID is already known, hope somebody just resent: - $self->done; # write barrier, clears $self->{skel} + $self->barrier; foreach my $m (@$mids) { # read-only lookup now safe to do after above barrier my $existing = $self->lookup_content($mime, $m); @@ -228,6 +228,37 @@ sub checkpoint { $self->searchidx_checkpoint(1); } +# issue a write barrier to ensure all data is visible to other processes +# and read-only ops. Order of data importance is: git > SQLite > Xapian +sub barrier { + my ($self) = @_; + + # For safety, we ensure git checkpoint is complete before because + # the data in git is still more important than what is in Xapian. + # Performance may be gained by delaying ->progress call but we + # lose safety + if (my $im = $self->{im}) { + $im->checkpoint; + $im->progress('checkpoint'); + } + my $skel = $self->{skel}; + my $parts = $self->{idx_parts}; + if ($parts && $skel) { + my $dbh = $skel->{mm}->{dbh}; + $dbh->commit; # SQLite data is second in importance + + # Now deal with Xapian + $skel->barrier_init(scalar(@$parts)); + # each partition needs to issue a barrier command to skel: + $_->barrier foreach @$parts; + + $skel->barrier_wait; # wait for each Xapian partition + + $dbh->begin_work; + } + $self->{transact_bytes} = 0; +} + sub searchidx_checkpoint { my ($self, $more) = @_; @@ -349,6 +380,7 @@ sub lookup_content { my $ibx = $self->{-inbox}; my $srch = $ibx->search; + $srch->reopen; my $cid = content_id($mime); my $found; $srch->each_smsg_by_mid($mid, sub { diff --git a/t/v2writable.t b/t/v2writable.t index 404c8651..7d276da7 100644 --- a/t/v2writable.t +++ b/t/v2writable.t @@ -55,7 +55,7 @@ if ('ensure git configs are correct') { { my @warn; local $SIG{__WARN__} = sub { push @warn, @_ }; - is(undef, $im->add($mime), 'obvious duplicate rejected'); + is($im->add($mime), undef, 'obvious duplicate rejected'); like(join(' ', @warn), qr/resent/, 'warned about resent message'); @warn = (); @@ -105,6 +105,7 @@ if ('ensure git configs are correct') { ok($im->add($mime), 'message with multiple Message-ID'); $im->done; my @found; + $ibx->search->reopen; $ibx->search->each_smsg_by_mid('abcde@1', sub { push @found, @_; 1 }); is(scalar(@found), 1, 'message found by first MID'); $ibx->search->each_smsg_by_mid('abcde@2', sub { push @found, @_; 1 });