]> Sergey Matveev's repositories - public-inbox.git/commitdiff
v2writable: support "barrier" operation to avoid reforking
authorEric Wong (Contractor, The Linux Foundation) <e@80x24.org>
Mon, 19 Mar 2018 08:14:34 +0000 (08:14 +0000)
committerEric Wong (Contractor, The Linux Foundation) <e@80x24.org>
Mon, 19 Mar 2018 08:16:34 +0000 (08:16 +0000)
Stopping and starting a bunch of processes to look up duplicates
or removals is inefficient.  Take advantage of checkpointing
in "git fast-import" and transactions in Xapian and SQLite.

lib/PublicInbox/Import.pm
lib/PublicInbox/SearchIdxPart.pm
lib/PublicInbox/SearchIdxSkeleton.pm
lib/PublicInbox/V2Writable.pm
t/v2writable.t

index 664bec6910e5f4d70c6c10e21a1ff2de2da6bd09..8406c9e268866fc23f9c4758f511994a964cfeea 100644 (file)
@@ -133,7 +133,6 @@ sub check_remove_v1 {
        (undef, $cur);
 }
 
-# used for v2 (maybe)
 sub checkpoint {
        my ($self) = @_;
        return unless $self->{pid};
@@ -141,6 +140,15 @@ sub checkpoint {
        undef;
 }
 
+sub progress {
+       my ($self, $msg) = @_;
+       return unless $self->{pid};
+       print { $self->{out} } "progress $msg\n" or wfail;
+       $self->{in}->getline eq "progress $msg\n" or die
+               "progress $msg not received\n";
+       undef;
+}
+
 # used for v2
 sub get_mark {
        my ($self, $mark) = @_;
index 6d8cb2a7e93748814eb0a24932735dc736f294ff..dd7ace67adcf04b09abb9d217676c4d53f8da3ba 100644 (file)
@@ -49,6 +49,11 @@ sub partition_worker_loop ($$$) {
                } elsif ($line eq "close\n") {
                        $self->_xdb_release;
                        $xdb = $txn = undef;
+               } elsif ($line eq "barrier\n") {
+                       $xdb->commit_transaction if $txn;
+                       $txn = undef;
+                       print { $self->{skeleton}->{w} } "barrier $part\n" or
+                                       die "write failed to skeleton: $!\n";
                } else {
                        chomp $line;
                        my ($len, $artnum, $oid, $mid0) = split(/ /, $line);
@@ -81,4 +86,11 @@ sub atfork_child {
        close $_[0]->{w} or die "failed to close write pipe: $!\n";
 }
 
+# called by V2Writable:
+sub barrier {
+       my $w = $_[0]->{w};
+       print $w "barrier\n" or die "failed to print: $!";
+       $w->flush or die "failed to flush: $!";
+}
+
 1;
index 40b28c519bfe8b2c92ea0db99b61b9f98cc49e01..4cb10f59136cf0fe3f1365f642060a6f2a339f30 100644 (file)
@@ -15,21 +15,25 @@ sub new {
 
        my ($r, $w);
        pipe($r, $w) or die "pipe failed: $!\n";
-       binmode $r, ':raw';
-       binmode $w, ':raw';
+       my ($barrier_wait, $barrier_note);
+       pipe($barrier_wait, $barrier_note) or die "pipe failed: $!\n";
+       binmode $_, ':raw' foreach ($r, $w, $barrier_wait, $barrier_note);
        my $pid = fork;
        defined $pid or die "fork failed: $!\n";
        if ($pid == 0) {
                $v2writable->atfork_child;
                $v2writable = undef;
                close $w;
-               eval { skeleton_worker_loop($self, $r) };
+               close $barrier_wait;
+               eval { skeleton_worker_loop($self, $r, $barrier_note) };
                die "skeleton worker died: $@\n" if $@;
                exit;
        }
        $self->{w} = $w;
        $self->{pid} = $pid;
        close $r;
+       close $barrier_note;
+       $self->{barrier_wait} = $barrier_wait;
 
        $w->autoflush(1);
 
@@ -40,11 +44,13 @@ sub new {
 }
 
 sub skeleton_worker_loop {
-       my ($self, $r) = @_;
+       my ($self, $r, $barrier_note) = @_;
+       $barrier_note->autoflush(1);
        $0 = 'pi-v2-skeleton';
        my $xdb = $self->_xdb_acquire;
        $xdb->begin_transaction;
        my $txn = 1;
+       my $barrier = undef;
        while (my $line = $r->getline) {
                if ($line eq "commit\n") {
                        $xdb->commit_transaction if $txn;
@@ -52,6 +58,21 @@ sub skeleton_worker_loop {
                } elsif ($line eq "close\n") {
                        $self->_xdb_release;
                        $xdb = $txn = undef;
+               } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) {
+                       my $n = $1 - 1;
+                       die "barrier in-progress\n" if defined $barrier;
+                       $barrier = { map { $_ => 1 } (0..$n) };
+               } elsif ($line =~ /\Abarrier (\d+)\n\z/) {
+                       my $part = $1;
+                       die "no barrier in-progress\n" unless defined $barrier;
+                       delete $barrier->{$1} or die "unknown barrier: $part\n";
+                       if ((scalar keys %$barrier) == 0) {
+                               $barrier = undef;
+                               $xdb->commit_transaction if $txn;
+                               $txn = undef;
+                               print $barrier_note "barrier_done\n" or die
+                                       "print failed to barrier note: $!";
+                       }
                } else {
                        my $len = int($line);
                        my $n = read($r, my $msg, $len) or die "read: $!\n";
@@ -107,4 +128,18 @@ sub index_skeleton_real ($$) {
        $self->link_and_save($doc, $mids, \@refs, $num, $xpath);
 }
 
+# write to the subprocess
+sub barrier_init {
+       my ($self, $nparts) = @_;
+       my $w = $_[0]->{w};
+       print $w "barrier_init $nparts\n" or die "failed to write: $!";
+       $w->flush or die "failed to flush: $!";
+}
+
+sub barrier_wait {
+       my ($self) = @_;
+       my $l = $self->{barrier_wait}->getline;
+       $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n";
+}
+
 1;
index 7728b91a542a79727eaed2f627639fdad080cbb0..6e2a8d68cc45f10e129d8f072c4dca45d3914746 100644 (file)
@@ -113,7 +113,7 @@ sub num_for {
                };
 
                # crap, Message-ID is already known, hope somebody just resent:
-               $self->done; # write barrier, clears $self->{skel}
+               $self->barrier;
                foreach my $m (@$mids) {
                        # read-only lookup now safe to do after above barrier
                        my $existing = $self->lookup_content($mime, $m);
@@ -228,6 +228,37 @@ sub checkpoint {
        $self->searchidx_checkpoint(1);
 }
 
+# issue a write barrier to ensure all data is visible to other processes
+# and read-only ops.  Order of data importance is: git > SQLite > Xapian
+sub barrier {
+       my ($self) = @_;
+
+       # For safety, we ensure git checkpoint is complete before because
+       # the data in git is still more important than what is in Xapian.
+       # Performance may be gained by delaying ->progress call but we
+       # lose safety
+       if (my $im = $self->{im}) {
+               $im->checkpoint;
+               $im->progress('checkpoint');
+       }
+       my $skel = $self->{skel};
+       my $parts = $self->{idx_parts};
+       if ($parts && $skel) {
+               my $dbh = $skel->{mm}->{dbh};
+               $dbh->commit; # SQLite data is second in importance
+
+               # Now deal with Xapian
+               $skel->barrier_init(scalar(@$parts));
+               # each partition needs to issue a barrier command to skel:
+               $_->barrier foreach @$parts;
+
+               $skel->barrier_wait; # wait for each Xapian partition
+
+               $dbh->begin_work;
+       }
+       $self->{transact_bytes} = 0;
+}
+
 sub searchidx_checkpoint {
        my ($self, $more) = @_;
 
@@ -349,6 +380,7 @@ sub lookup_content {
        my $ibx = $self->{-inbox};
 
        my $srch = $ibx->search;
+       $srch->reopen;
        my $cid = content_id($mime);
        my $found;
        $srch->each_smsg_by_mid($mid, sub {
index 404c86518fd0382769afa3833c57dbdb90fa67da..7d276da7091fb18f9db73930e4e36a3e89634960 100644 (file)
@@ -55,7 +55,7 @@ if ('ensure git configs are correct') {
 {
        my @warn;
        local $SIG{__WARN__} = sub { push @warn, @_ };
-       is(undef, $im->add($mime), 'obvious duplicate rejected');
+       is($im->add($mime), undef, 'obvious duplicate rejected');
        like(join(' ', @warn), qr/resent/, 'warned about resent message');
 
        @warn = ();
@@ -105,6 +105,7 @@ if ('ensure git configs are correct') {
        ok($im->add($mime), 'message with multiple Message-ID');
        $im->done;
        my @found;
+       $ibx->search->reopen;
        $ibx->search->each_smsg_by_mid('abcde@1', sub { push @found, @_; 1 });
        is(scalar(@found), 1, 'message found by first MID');
        $ibx->search->each_smsg_by_mid('abcde@2', sub { push @found, @_; 1 });