]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
searchidx: regenerate and avoid article number gaps on full index
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index 8361d09fab54d2c598d12aa5eec203e4ef0cb42f..2cc87305668256709f773415f750e5f52653503c 100644 (file)
@@ -15,15 +15,19 @@ use PublicInbox::ContentId qw(content_id content_digest);
 use PublicInbox::Inbox;
 use PublicInbox::OverIdx;
 use PublicInbox::Msgmap;
-use PublicInbox::Spawn;
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::SearchIdx;
 use IO::Handle;
 
 # an estimate of the post-packed size to the raw uncompressed size
 my $PACKING_FACTOR = 0.4;
 
 # assume 2 cores if GNU nproc(1) is not available
-sub nproc () {
-       int($ENV{NPROC} || `nproc 2>/dev/null` || 2);
+sub nproc_parts () {
+       my $n = int($ENV{NPROC} || `nproc 2>/dev/null` || 2);
+       # subtract for the main process and git-fast-import
+       $n -= 1;
+       $n < 1 ? 1 : $n;
 }
 
 sub count_partitions ($) {
@@ -73,7 +77,7 @@ sub new {
                rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
                last_commit => [], # git repo -> commit
        };
-       $self->{partitions} = count_partitions($self) || nproc();
+       $self->{partitions} = count_partitions($self) || nproc_parts();
        bless $self, $class;
 }
 
@@ -139,7 +143,6 @@ sub num_for {
                };
 
                # crap, Message-ID is already known, hope somebody just resent:
-               $self->barrier;
                foreach my $m (@$mids) {
                        # read-only lookup now safe to do after above barrier
                        my $existing = $self->lookup_content($mime, $m);
@@ -172,19 +175,19 @@ sub num_for_harder {
 
        my $hdr = $mime->header_obj;
        my $dig = content_digest($mime);
-       $$mid0 = PublicInbox::Import::digest2mid($dig);
+       $$mid0 = PublicInbox::Import::digest2mid($dig, $hdr);
        my $num = $self->{mm}->mid_insert($$mid0);
        unless (defined $num) {
                # it's hard to spoof the last Received: header
                my @recvd = $hdr->header_raw('Received');
                $dig->add("Received: $_") foreach (@recvd);
-               $$mid0 = PublicInbox::Import::digest2mid($dig);
+               $$mid0 = PublicInbox::Import::digest2mid($dig, $hdr);
                $num = $self->{mm}->mid_insert($$mid0);
 
                # fall back to a random Message-ID and give up determinism:
                until (defined($num)) {
                        $dig->add(rand);
-                       $$mid0 = PublicInbox::Import::digest2mid($dig);
+                       $$mid0 = PublicInbox::Import::digest2mid($dig, $hdr);
                        warn "using random Message-ID <$$mid0> as fallback\n";
                        $num = $self->{mm}->mid_insert($$mid0);
                }
@@ -257,14 +260,32 @@ sub purge_oids {
        $purges;
 }
 
+sub content_ids ($) {
+       my ($mime) = @_;
+       my @cids = ( content_id($mime) );
+
+       # Email::MIME->as_string doesn't always round-trip, so we may
+       # use a second content_id
+       my $rt = content_id(PublicInbox::MIME->new(\($mime->as_string)));
+       push @cids, $rt if $cids[0] ne $rt;
+       \@cids;
+}
+
+sub content_matches ($$) {
+       my ($cids, $existing) = @_;
+       my $cid = content_id($existing);
+       foreach (@$cids) {
+               return 1 if $_ eq $cid
+       }
+       0
+}
+
 sub remove_internal {
        my ($self, $mime, $cmt_msg, $purge) = @_;
-       $self->barrier;
        $self->idx_init;
        my $im = $self->importer unless $purge;
-       my $ibx = $self->{-inbox};
        my $over = $self->{over};
-       my $cid = content_id($mime);
+       my $cids = content_ids($mime);
        my $parts = $self->{idx_parts};
        my $mm = $self->{mm};
        my $removed;
@@ -280,14 +301,14 @@ sub remove_internal {
                my %gone;
                my ($id, $prev);
                while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
-                       my $msg = $ibx->msg_by_smsg($smsg);
+                       my $msg = get_blob($self, $smsg);
                        if (!defined($msg)) {
                                warn "broken smsg for $mid\n";
                                next; # continue
                        }
                        my $orig = $$msg;
                        my $cur = PublicInbox::MIME->new($msg);
-                       if (content_id($cur) eq $cid) {
+                       if (content_matches($cids, $cur)) {
                                $smsg->{mime} = $cur;
                                $gone{$smsg->{num}} = [ $smsg, \$orig ];
                        }
@@ -313,7 +334,6 @@ sub remove_internal {
                        $orig = undef;
                        $self->unindex_oid_remote($oid, $mid);
                }
-               $self->barrier;
        }
 
        if (defined $mark) {
@@ -359,45 +379,6 @@ sub set_last_commits ($) {
        }
 }
 
-sub done {
-       my ($self) = @_;
-       my $im = delete $self->{im};
-       $im->done if $im; # PublicInbox::Import::done
-
-       my $mm = $self->{mm};
-       $mm->{dbh}->commit if $mm;
-
-       # order matters, we can only close {over} after all partitions
-       # are done because the partitions also write to {over}
-       my $parts = delete $self->{idx_parts};
-       if ($parts) {
-               $_->remote_commit for @$parts;
-               $_->remote_close for @$parts;
-       }
-
-       my $over = $self->{over};
-       $over->commit_lazy;
-       $over->disconnect;
-
-       if ($mm) {
-               $mm->{dbh}->begin_work;
-               set_last_commits($self);
-               $mm->{dbh}->commit;
-               delete $self->{mm};
-       }
-
-       delete $self->{bnote};
-       $self->{transact_bytes} = 0;
-       $self->lock_release if $parts;
-}
-
-sub checkpoint {
-       my ($self) = @_;
-       my $im = $self->{im};
-       $im->checkpoint if $im; # PublicInbox::Import::checkpoint
-       $self->barrier;
-}
-
 sub barrier_init {
        my ($self, $n) = @_;
        $self->{bnote} or return;
@@ -416,13 +397,15 @@ sub barrier_wait {
        }
 }
 
-# issue a write barrier to ensure all data is visible to other processes
-# and read-only ops.  Order of data importance is: git > SQLite > Xapian
-sub barrier {
-       my ($self) = @_;
+sub checkpoint ($;$) {
+       my ($self, $wait) = @_;
 
        if (my $im = $self->{im}) {
-               $im->barrier;
+               if ($wait) {
+                       $im->barrier;
+               } else {
+                       $im->checkpoint;
+               }
        }
        my $parts = $self->{idx_parts};
        if ($parts) {
@@ -435,11 +418,17 @@ sub barrier {
                $self->{over}->commit_lazy;
 
                # Now deal with Xapian
-               my $barrier = $self->barrier_init(scalar @$parts);
+               if ($wait) {
+                       my $barrier = $self->barrier_init(scalar @$parts);
 
-               # each partition needs to issue a barrier command
-               $_->remote_barrier for @$parts;
-               $self->barrier_wait($barrier); # wait for each Xapian partition
+                       # each partition needs to issue a barrier command
+                       $_->remote_barrier for @$parts;
+
+                       # wait for each Xapian partition
+                       $self->barrier_wait($barrier);
+               } else {
+                       $_->remote_commit for @$parts;
+               }
 
                # last_commit is special, don't commit these until
                # remote partitions are done:
@@ -452,6 +441,27 @@ sub barrier {
        $self->{transact_bytes} = 0;
 }
 
+# issue a write barrier to ensure all data is visible to other processes
+# and read-only ops.  Order of data importance is: git > SQLite > Xapian
+sub barrier { checkpoint($_[0], 1) };
+
+sub done {
+       my ($self) = @_;
+       my $im = delete $self->{im};
+       $im->done if $im; # PublicInbox::Import::done
+       checkpoint($self);
+       my $mm = delete $self->{mm};
+       $mm->{dbh}->commit if $mm;
+       my $parts = delete $self->{idx_parts};
+       if ($parts) {
+               $_->remote_close for @$parts;
+       }
+       $self->{over}->disconnect;
+       delete $self->{bnote};
+       $self->{transact_bytes} = 0;
+       $self->lock_release if $parts;
+}
+
 sub git_init {
        my ($self, $epoch) = @_;
        my $pfx = "$self->{-inbox}->{mainrepo}/git";
@@ -512,8 +522,8 @@ sub importer {
                } else {
                        $self->{im} = undef;
                        $im->done;
-                       $self->barrier;
                        $im = undef;
+                       $self->checkpoint;
                        my $git_dir = $self->git_init(++$self->{epoch_max});
                        my $git = PublicInbox::Git->new($git_dir);
                        return $self->import_init($git, 0);
@@ -552,7 +562,6 @@ sub import_init {
 sub diff ($$$) {
        my ($mid, $cur, $new) = @_;
        use File::Temp qw(tempfile);
-       use PublicInbox::Spawn qw(spawn);
 
        my ($ah, $an) = tempfile('email-cur-XXXXXXXX', TMPDIR => 1);
        print $ah $cur->as_string or die "print: $!";
@@ -569,30 +578,39 @@ sub diff ($$$) {
        unlink($an, $bn);
 }
 
+sub get_blob ($$) {
+       my ($self, $smsg) = @_;
+       if (my $im = $self->{im}) {
+               my $msg = $im->cat_blob($smsg->{blob});
+               return $msg if $msg;
+       }
+       # older message, should be in alternates
+       my $ibx = $self->{-inbox};
+       $ibx->msg_by_smsg($smsg);
+}
+
 sub lookup_content {
        my ($self, $mime, $mid) = @_;
-       my $ibx = $self->{-inbox};
        my $over = $self->{over};
-       my $cid = content_id($mime);
-       my $found;
+       my $cids = content_ids($mime);
        my ($id, $prev);
        while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
-               my $msg = $ibx->msg_by_smsg($smsg);
+               my $msg = get_blob($self, $smsg);
                if (!defined($msg)) {
                        warn "broken smsg for $mid\n";
                        next;
                }
                my $cur = PublicInbox::MIME->new($msg);
-               if (content_id($cur) eq $cid) {
+               if (content_matches($cids, $cur)) {
                        $smsg->{mime} = $cur;
-                       $found = $smsg;
-                       last;
+                       return $smsg;
                }
 
+
                # XXX DEBUG_DIFF is experimental and may be removed
                diff($mid, $cur, $mime) if $ENV{DEBUG_DIFF};
        }
-       $found;
+       undef;
 }
 
 sub atfork_child {
@@ -720,16 +738,7 @@ sub last_commits {
        $heads;
 }
 
-sub is_ancestor ($$$) {
-       my ($git, $cur, $tip) = @_;
-       return 0 unless $git->check($cur);
-       my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
-               qw(merge-base --is-ancestor), $cur, $tip ];
-       my $pid = spawn($cmd);
-       defined $pid or die "spawning ".join(' ', @$cmd)." failed: $!";
-       waitpid($pid, 0) == $pid or die join(' ', @$cmd) .' did not finish';
-       $? == 0;
-}
+*is_ancestor = *PublicInbox::SearchIdx::is_ancestor;
 
 sub index_prepare {
        my ($self, $opts, $epoch_max, $ranges) = @_;
@@ -804,7 +813,7 @@ sub unindex_oid {
                my %gone;
                my ($id, $prev);
                while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
-                       $gone{$smsg->num} = 1 if $oid eq $smsg->{blob};
+                       $gone{$smsg->{num}} = 1 if $oid eq $smsg->{blob};
                        1; # continue
                }
                my $n = scalar keys %gone;
@@ -815,7 +824,6 @@ sub unindex_oid {
                }
                $self->{unindexed}->{$_}++ foreach keys %gone;
                $self->unindex_oid_remote($oid, $mid);
-               $self->barrier;
        }
 }
 
@@ -823,7 +831,6 @@ my $x40 = qr/[a-f0-9]{40}/;
 sub unindex {
        my ($self, $opts, $git, $unindex_range) = @_;
        my $un = $self->{unindexed} ||= {}; # num => removal count
-       $self->barrier;
        my $before = scalar keys %$un;
        my @cmd = qw(log --raw -r
                        --no-notes --no-color --no-abbrev --no-renames);
@@ -847,7 +854,6 @@ sub unindex {
 sub index_sync {
        my ($self, $opts) = @_;
        $opts ||= {};
-       my $ibx = $self->{-inbox};
        my $epoch_max;
        my $latest = git_dir_latest($self, \$epoch_max);
        return unless defined $latest;