]> Sergey Matveev's repositories - public-inbox.git/commitdiff
v2writable: reduce barriers
authorEric Wong (Contractor, The Linux Foundation) <e@80x24.org>
Sat, 7 Apr 2018 03:41:50 +0000 (03:41 +0000)
committerEric Wong (Contractor, The Linux Foundation) <e@80x24.org>
Sat, 7 Apr 2018 03:42:27 +0000 (03:42 +0000)
Since we handle the overview info synchronously, we only need
barriers in tests, now.  We will use asynchronous checkpoints
to sync less-important Xapian data.

For data deduplication, this requires us to hoist out the
cat-blob support in ::Import for reading uncommitted data
in git.

lib/PublicInbox/Import.pm
lib/PublicInbox/V2Writable.pm
t/v2writable.t

index 2529798fe4b7adc40424dad7b32341ccd9597ac0..9e8900f3e6f86d2ce50576039654e75cad104120 100644 (file)
@@ -95,19 +95,13 @@ sub _check_path ($$$$) {
        $info =~ /\Amissing / ? undef : $info;
 }
 
-sub check_remove_v1 {
-       my ($r, $w, $tip, $path, $mime) = @_;
-
-       my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
-       $info =~ m!\A100644 blob ([a-f0-9]{40})\t!s or die "not blob: $info";
-       my $blob = $1;
-
-       print $w "cat-blob $blob\n" or wfail;
+sub _cat_blob ($$$) {
+       my ($r, $w, $oid) = @_;
+       print $w "cat-blob $oid\n" or wfail;
        local $/ = "\n";
-       $info = <$r>;
+       my $info = <$r>;
        defined $info or die "EOF from fast-import / cat-blob: $!";
-       $info =~ /\A[a-f0-9]{40} blob (\d+)\n\z/ or
-                               die "unexpected cat-blob response: $info";
+       $info =~ /\A[a-f0-9]{40} blob (\d+)\n\z/ or return;
        my $left = $1;
        my $offset = 0;
        my $buf = '';
@@ -122,7 +116,23 @@ sub check_remove_v1 {
        $n = read($r, my $lf, 1);
        defined($n) or die "read final byte of cat-blob failed: $!";
        die "bad read on final byte: <$lf>" if $lf ne "\n";
-       my $cur = PublicInbox::MIME->new(\$buf);
+       \$buf;
+}
+
+sub cat_blob {
+       my ($self, $oid) = @_;
+       my ($r, $w) = $self->gfi_start;
+       _cat_blob($r, $w, $oid);
+}
+
+sub check_remove_v1 {
+       my ($r, $w, $tip, $path, $mime) = @_;
+
+       my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
+       $info =~ m!\A100644 blob ([a-f0-9]{40})\t!s or die "not blob: $info";
+       my $oid = $1;
+       my $msg = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
+       my $cur = PublicInbox::MIME->new($msg);
        my $cur_s = $cur->header('Subject');
        $cur_s = '' unless defined $cur_s;
        my $cur_m = $mime->header('Subject');
index 8361d09fab54d2c598d12aa5eec203e4ef0cb42f..53fdb738621964171fa5a5aedb78b7ed17ccebb2 100644 (file)
@@ -139,7 +139,6 @@ sub num_for {
                };
 
                # crap, Message-ID is already known, hope somebody just resent:
-               $self->barrier;
                foreach my $m (@$mids) {
                        # read-only lookup now safe to do after above barrier
                        my $existing = $self->lookup_content($mime, $m);
@@ -259,10 +258,8 @@ sub purge_oids {
 
 sub remove_internal {
        my ($self, $mime, $cmt_msg, $purge) = @_;
-       $self->barrier;
        $self->idx_init;
        my $im = $self->importer unless $purge;
-       my $ibx = $self->{-inbox};
        my $over = $self->{over};
        my $cid = content_id($mime);
        my $parts = $self->{idx_parts};
@@ -280,7 +277,7 @@ sub remove_internal {
                my %gone;
                my ($id, $prev);
                while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
-                       my $msg = $ibx->msg_by_smsg($smsg);
+                       my $msg = get_blob($self, $smsg);
                        if (!defined($msg)) {
                                warn "broken smsg for $mid\n";
                                next; # continue
@@ -313,7 +310,6 @@ sub remove_internal {
                        $orig = undef;
                        $self->unindex_oid_remote($oid, $mid);
                }
-               $self->barrier;
        }
 
        if (defined $mark) {
@@ -359,45 +355,6 @@ sub set_last_commits ($) {
        }
 }
 
-sub done {
-       my ($self) = @_;
-       my $im = delete $self->{im};
-       $im->done if $im; # PublicInbox::Import::done
-
-       my $mm = $self->{mm};
-       $mm->{dbh}->commit if $mm;
-
-       # order matters, we can only close {over} after all partitions
-       # are done because the partitions also write to {over}
-       my $parts = delete $self->{idx_parts};
-       if ($parts) {
-               $_->remote_commit for @$parts;
-               $_->remote_close for @$parts;
-       }
-
-       my $over = $self->{over};
-       $over->commit_lazy;
-       $over->disconnect;
-
-       if ($mm) {
-               $mm->{dbh}->begin_work;
-               set_last_commits($self);
-               $mm->{dbh}->commit;
-               delete $self->{mm};
-       }
-
-       delete $self->{bnote};
-       $self->{transact_bytes} = 0;
-       $self->lock_release if $parts;
-}
-
-sub checkpoint {
-       my ($self) = @_;
-       my $im = $self->{im};
-       $im->checkpoint if $im; # PublicInbox::Import::checkpoint
-       $self->barrier;
-}
-
 sub barrier_init {
        my ($self, $n) = @_;
        $self->{bnote} or return;
@@ -416,13 +373,15 @@ sub barrier_wait {
        }
 }
 
-# issue a write barrier to ensure all data is visible to other processes
-# and read-only ops.  Order of data importance is: git > SQLite > Xapian
-sub barrier {
-       my ($self) = @_;
+sub checkpoint ($;$) {
+       my ($self, $wait) = @_;
 
        if (my $im = $self->{im}) {
-               $im->barrier;
+               if ($wait) {
+                       $im->barrier;
+               } else {
+                       $im->checkpoint;
+               }
        }
        my $parts = $self->{idx_parts};
        if ($parts) {
@@ -435,11 +394,17 @@ sub barrier {
                $self->{over}->commit_lazy;
 
                # Now deal with Xapian
-               my $barrier = $self->barrier_init(scalar @$parts);
+               if ($wait) {
+                       my $barrier = $self->barrier_init(scalar @$parts);
+
+                       # each partition needs to issue a barrier command
+                       $_->remote_barrier for @$parts;
 
-               # each partition needs to issue a barrier command
-               $_->remote_barrier for @$parts;
-               $self->barrier_wait($barrier); # wait for each Xapian partition
+                       # wait for each Xapian partition
+                       $self->barrier_wait($barrier);
+               } else {
+                       $_->remote_commit for @$parts;
+               }
 
                # last_commit is special, don't commit these until
                # remote partitions are done:
@@ -452,6 +417,27 @@ sub barrier {
        $self->{transact_bytes} = 0;
 }
 
+# issue a write barrier to ensure all data is visible to other processes
+# and read-only ops.  Order of data importance is: git > SQLite > Xapian
+sub barrier { checkpoint($_[0], 1) };
+
+sub done {
+       my ($self) = @_;
+       my $im = delete $self->{im};
+       $im->done if $im; # PublicInbox::Import::done
+       checkpoint($self);
+       my $mm = delete $self->{mm};
+       $mm->{dbh}->commit if $mm;
+       my $parts = delete $self->{idx_parts};
+       if ($parts) {
+               $_->remote_close for @$parts;
+       }
+       $self->{over}->disconnect;
+       delete $self->{bnote};
+       $self->{transact_bytes} = 0;
+       $self->lock_release if $parts;
+}
+
 sub git_init {
        my ($self, $epoch) = @_;
        my $pfx = "$self->{-inbox}->{mainrepo}/git";
@@ -512,8 +498,8 @@ sub importer {
                } else {
                        $self->{im} = undef;
                        $im->done;
-                       $self->barrier;
                        $im = undef;
+                       $self->checkpoint;
                        my $git_dir = $self->git_init(++$self->{epoch_max});
                        my $git = PublicInbox::Git->new($git_dir);
                        return $self->import_init($git, 0);
@@ -569,15 +555,25 @@ sub diff ($$$) {
        unlink($an, $bn);
 }
 
+sub get_blob ($$) {
+       my ($self, $smsg) = @_;
+       if (my $im = $self->{im}) {
+               my $msg = $im->cat_blob($smsg->{blob});
+               return $msg if $msg;
+       }
+       # older message, should be in alternates
+       my $ibx = $self->{-inbox};
+       $ibx->msg_by_smsg($smsg);
+}
+
 sub lookup_content {
        my ($self, $mime, $mid) = @_;
-       my $ibx = $self->{-inbox};
        my $over = $self->{over};
        my $cid = content_id($mime);
        my $found;
        my ($id, $prev);
        while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
-               my $msg = $ibx->msg_by_smsg($smsg);
+               my $msg = get_blob($self, $smsg);
                if (!defined($msg)) {
                        warn "broken smsg for $mid\n";
                        next;
@@ -815,7 +811,6 @@ sub unindex_oid {
                }
                $self->{unindexed}->{$_}++ foreach keys %gone;
                $self->unindex_oid_remote($oid, $mid);
-               $self->barrier;
        }
 }
 
@@ -823,7 +818,6 @@ my $x40 = qr/[a-f0-9]{40}/;
 sub unindex {
        my ($self, $opts, $git, $unindex_range) = @_;
        my $un = $self->{unindexed} ||= {}; # num => removal count
-       $self->barrier;
        my $before = scalar keys %$un;
        my @cmd = qw(log --raw -r
                        --no-notes --no-color --no-abbrev --no-renames);
@@ -847,7 +841,6 @@ sub unindex {
 sub index_sync {
        my ($self, $opts) = @_;
        $opts ||= {};
-       my $ibx = $self->{-inbox};
        my $epoch_max;
        my $latest = git_dir_latest($self, \$epoch_max);
        return unless defined $latest;
index 4a42c016538bebc6d1efe5184cfbdd611dbe004c..b543c53fa44e0e5a98b46527146791f92f334336 100644 (file)
@@ -213,8 +213,8 @@ EOF
        $im = PublicInbox::V2Writable->new($ibx, 1);
        is($im->{partitions}, 1, 'detected single partition from previous');
        my $smsg = $im->remove($mime, 'test removal');
-       my @after = $git0->qx(qw(log --pretty=oneline));
        $im->done;
+       my @after = $git0->qx(qw(log --pretty=oneline));
        my $tip = shift @after;
        like($tip, qr/\A[a-f0-9]+ test removal\n\z/s,
                'commit message propagated to git');