]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
v2writable: reduce barriers
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index 8361d09fab54d2c598d12aa5eec203e4ef0cb42f..53fdb738621964171fa5a5aedb78b7ed17ccebb2 100644 (file)
@@ -139,7 +139,6 @@ sub num_for {
                };
 
                # crap, Message-ID is already known, hope somebody just resent:
-               $self->barrier;
                foreach my $m (@$mids) {
                        # read-only lookup now safe to do after above barrier
                        my $existing = $self->lookup_content($mime, $m);
@@ -259,10 +258,8 @@ sub purge_oids {
 
 sub remove_internal {
        my ($self, $mime, $cmt_msg, $purge) = @_;
-       $self->barrier;
        $self->idx_init;
        my $im = $self->importer unless $purge;
-       my $ibx = $self->{-inbox};
        my $over = $self->{over};
        my $cid = content_id($mime);
        my $parts = $self->{idx_parts};
@@ -280,7 +277,7 @@ sub remove_internal {
                my %gone;
                my ($id, $prev);
                while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
-                       my $msg = $ibx->msg_by_smsg($smsg);
+                       my $msg = get_blob($self, $smsg);
                        if (!defined($msg)) {
                                warn "broken smsg for $mid\n";
                                next; # continue
@@ -313,7 +310,6 @@ sub remove_internal {
                        $orig = undef;
                        $self->unindex_oid_remote($oid, $mid);
                }
-               $self->barrier;
        }
 
        if (defined $mark) {
@@ -359,45 +355,6 @@ sub set_last_commits ($) {
        }
 }
 
-sub done {
-       my ($self) = @_;
-       my $im = delete $self->{im};
-       $im->done if $im; # PublicInbox::Import::done
-
-       my $mm = $self->{mm};
-       $mm->{dbh}->commit if $mm;
-
-       # order matters, we can only close {over} after all partitions
-       # are done because the partitions also write to {over}
-       my $parts = delete $self->{idx_parts};
-       if ($parts) {
-               $_->remote_commit for @$parts;
-               $_->remote_close for @$parts;
-       }
-
-       my $over = $self->{over};
-       $over->commit_lazy;
-       $over->disconnect;
-
-       if ($mm) {
-               $mm->{dbh}->begin_work;
-               set_last_commits($self);
-               $mm->{dbh}->commit;
-               delete $self->{mm};
-       }
-
-       delete $self->{bnote};
-       $self->{transact_bytes} = 0;
-       $self->lock_release if $parts;
-}
-
-sub checkpoint {
-       my ($self) = @_;
-       my $im = $self->{im};
-       $im->checkpoint if $im; # PublicInbox::Import::checkpoint
-       $self->barrier;
-}
-
 sub barrier_init {
        my ($self, $n) = @_;
        $self->{bnote} or return;
@@ -416,13 +373,15 @@ sub barrier_wait {
        }
 }
 
-# issue a write barrier to ensure all data is visible to other processes
-# and read-only ops.  Order of data importance is: git > SQLite > Xapian
-sub barrier {
-       my ($self) = @_;
+sub checkpoint ($;$) {
+       my ($self, $wait) = @_;
 
        if (my $im = $self->{im}) {
-               $im->barrier;
+               if ($wait) {
+                       $im->barrier;
+               } else {
+                       $im->checkpoint;
+               }
        }
        my $parts = $self->{idx_parts};
        if ($parts) {
@@ -435,11 +394,17 @@ sub barrier {
                $self->{over}->commit_lazy;
 
                # Now deal with Xapian
-               my $barrier = $self->barrier_init(scalar @$parts);
+               if ($wait) {
+                       my $barrier = $self->barrier_init(scalar @$parts);
+
+                       # each partition needs to issue a barrier command
+                       $_->remote_barrier for @$parts;
 
-               # each partition needs to issue a barrier command
-               $_->remote_barrier for @$parts;
-               $self->barrier_wait($barrier); # wait for each Xapian partition
+                       # wait for each Xapian partition
+                       $self->barrier_wait($barrier);
+               } else {
+                       $_->remote_commit for @$parts;
+               }
 
                # last_commit is special, don't commit these until
                # remote partitions are done:
@@ -452,6 +417,27 @@ sub barrier {
        $self->{transact_bytes} = 0;
 }
 
+# issue a write barrier to ensure all data is visible to other processes
+# and read-only ops.  Order of data importance is: git > SQLite > Xapian
+sub barrier { checkpoint($_[0], 1) };
+
+sub done {
+       my ($self) = @_;
+       my $im = delete $self->{im};
+       $im->done if $im; # PublicInbox::Import::done
+       checkpoint($self);
+       my $mm = delete $self->{mm};
+       $mm->{dbh}->commit if $mm;
+       my $parts = delete $self->{idx_parts};
+       if ($parts) {
+               $_->remote_close for @$parts;
+       }
+       $self->{over}->disconnect;
+       delete $self->{bnote};
+       $self->{transact_bytes} = 0;
+       $self->lock_release if $parts;
+}
+
 sub git_init {
        my ($self, $epoch) = @_;
        my $pfx = "$self->{-inbox}->{mainrepo}/git";
@@ -512,8 +498,8 @@ sub importer {
                } else {
                        $self->{im} = undef;
                        $im->done;
-                       $self->barrier;
                        $im = undef;
+                       $self->checkpoint;
                        my $git_dir = $self->git_init(++$self->{epoch_max});
                        my $git = PublicInbox::Git->new($git_dir);
                        return $self->import_init($git, 0);
@@ -569,15 +555,25 @@ sub diff ($$$) {
        unlink($an, $bn);
 }
 
+sub get_blob ($$) {
+       my ($self, $smsg) = @_;
+       if (my $im = $self->{im}) {
+               my $msg = $im->cat_blob($smsg->{blob});
+               return $msg if $msg;
+       }
+       # older message, should be in alternates
+       my $ibx = $self->{-inbox};
+       $ibx->msg_by_smsg($smsg);
+}
+
 sub lookup_content {
        my ($self, $mime, $mid) = @_;
-       my $ibx = $self->{-inbox};
        my $over = $self->{over};
        my $cid = content_id($mime);
        my $found;
        my ($id, $prev);
        while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
-               my $msg = $ibx->msg_by_smsg($smsg);
+               my $msg = get_blob($self, $smsg);
                if (!defined($msg)) {
                        warn "broken smsg for $mid\n";
                        next;
@@ -815,7 +811,6 @@ sub unindex_oid {
                }
                $self->{unindexed}->{$_}++ foreach keys %gone;
                $self->unindex_oid_remote($oid, $mid);
-               $self->barrier;
        }
 }
 
@@ -823,7 +818,6 @@ my $x40 = qr/[a-f0-9]{40}/;
 sub unindex {
        my ($self, $opts, $git, $unindex_range) = @_;
        my $un = $self->{unindexed} ||= {}; # num => removal count
-       $self->barrier;
        my $before = scalar keys %$un;
        my @cmd = qw(log --raw -r
                        --no-notes --no-color --no-abbrev --no-renames);
@@ -847,7 +841,6 @@ sub unindex {
 sub index_sync {
        my ($self, $opts) = @_;
        $opts ||= {};
-       my $ibx = $self->{-inbox};
        my $epoch_max;
        my $latest = git_dir_latest($self, \$epoch_max);
        return unless defined $latest;