]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
index+xcpdb: support --no-sync flag
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index c04ea5d77a443baa6fdcb4a88811f710e4156ea4..3dc200956679916053181a8de400f886026ab641 100644 (file)
@@ -8,6 +8,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::Lock);
 use PublicInbox::SearchIdxShard;
+use PublicInbox::IdxStack;
 use PublicInbox::Eml;
 use PublicInbox::Git;
 use PublicInbox::Import;
@@ -21,7 +22,7 @@ use PublicInbox::SearchIdx;
 use IO::Handle; # ->autoflush
 use File::Temp qw(tempfile);
 
-my $x40 = qr/[a-f0-9]{40}/;
+my $OID = qr/[a-f0-9]{40,}/;
 # an estimate of the post-packed size to the raw uncompressed size
 my $PACKING_FACTOR = 0.4;
 
@@ -108,19 +109,20 @@ sub new {
 
        my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION;
        my $self = {
-               -inbox => $v2ibx,
+               ibx => $v2ibx,
                im => undef, #  PublicInbox::Import
                parallel => 1,
                transact_bytes => 0,
                total_bytes => 0,
                current_info => '',
                xpfx => $xpfx,
-               over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3", 1),
+               over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3"),
                lock_path => "$dir/inbox.lock",
                # limit each git repo (epoch) to 1GB or so
                rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
-               last_commit => [], # git repo -> commit
+               last_commit => [], # git epoch -> commit
        };
+       $self->{over}->{-no_sync} = 1 if $v2ibx->{-no_sync};
        $self->{shards} = count_shards($self) || nproc_shards($creat);
        $self->{index_max_size} = $v2ibx->{index_max_size};
        bless $self, $class;
@@ -148,7 +150,7 @@ sub init_inbox {
 # mimics Import::add and wraps it for v2
 sub add {
        my ($self, $eml, $check_cb) = @_;
-       $self->{-inbox}->with_umask(\&_add, $self, $eml, $check_cb);
+       $self->{ibx}->with_umask(\&_add, $self, $eml, $check_cb);
 }
 
 # indexes a message, returns true if checkpointing is needed
@@ -168,7 +170,7 @@ sub _add {
 
        # spam check:
        if ($check_cb) {
-               $mime = $check_cb->($mime, $self->{-inbox}) or return;
+               $mime = $check_cb->($mime, $self->{ibx}) or return;
        }
 
        # All pipes (> $^F) known to Perl 5.6+ have FD_CLOEXEC set,
@@ -217,7 +219,7 @@ sub v2_num_for {
                # AltId may pre-populate article numbers (e.g. X-Mail-Count
                # or NNTP article number), use that article number if it's
                # not in Over.
-               my $altid = $self->{-inbox}->{altid};
+               my $altid = $self->{ibx}->{altid};
                if ($altid && grep(/:file=msgmap\.sqlite3\z/, @$altid)) {
                        my $num = $self->{mm}->num_for($mid);
 
@@ -292,7 +294,8 @@ sub _idx_init { # with_umask callback
        # Now that all subprocesses are up, we can open the FDs
        # for SQLite:
        my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
-               "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1);
+                               "$self->{ibx}->{inboxdir}/msgmap.sqlite3",
+                               $self->{ibx}->{-no_sync} ? 2 : 1);
        $mm->{dbh}->begin_work;
 }
 
@@ -300,7 +303,7 @@ sub _idx_init { # with_umask callback
 sub idx_init {
        my ($self, $opt) = @_;
        return if $self->{idx_shards};
-       my $ibx = $self->{-inbox};
+       my $ibx = $self->{ibx};
 
        # do not leak read-only FDs to child processes, we only have these
        # FDs for duplicate detection so they should not be
@@ -328,7 +331,7 @@ sub idx_init {
 sub _replace_oids ($$$) {
        my ($self, $mime, $replace_map) = @_;
        $self->done;
-       my $pfx = "$self->{-inbox}->{inboxdir}/git";
+       my $pfx = "$self->{ibx}->{inboxdir}/git";
        my $rewrites = []; # epoch => commit
        my $max = $self->{epoch_max};
 
@@ -449,7 +452,7 @@ sub rewrite_internal ($$;$$$) {
 # (retval[2]) is not part of the stable API shared with Import->remove
 sub remove {
        my ($self, $eml, $cmt_msg) = @_;
-       my $r = $self->{-inbox}->with_umask(\&rewrite_internal,
+       my $r = $self->{ibx}->with_umask(\&rewrite_internal,
                                                $self, $eml, $cmt_msg);
        defined($r) && defined($r->[0]) ? @$r: undef;
 }
@@ -457,7 +460,7 @@ sub remove {
 sub _replace ($$;$$) {
        my ($self, $old_eml, $new_eml, $sref) = @_;
        my $arg = [ $self, $old_eml, undef, $new_eml, $sref ];
-       my $rewritten = $self->{-inbox}->with_umask(\&rewrite_internal,
+       my $rewritten = $self->{ibx}->with_umask(\&rewrite_internal,
                        $self, $old_eml, undef, $new_eml, $sref) or return;
 
        my $rewrites = $rewritten->{rewrites};
@@ -483,7 +486,7 @@ sub git_hash_raw ($$) {
        my ($self, $raw) = @_;
        # grab the expected OID we have to reindex:
        pipe(my($in, $w)) or die "pipe: $!";
-       my $git_dir = $self->{-inbox}->git->{git_dir};
+       my $git_dir = $self->{ibx}->git->{git_dir};
        my $cmd = ['git', "--git-dir=$git_dir", qw(hash-object --stdin)];
        my $r = popen_rd($cmd, undef, { 0 => $in });
        print $w $$raw or die "print \$w: $!";
@@ -491,7 +494,7 @@ sub git_hash_raw ($$) {
        local $/ = "\n";
        chomp(my $oid = <$r>);
        close $r or die "git hash-object failed: $?";
-       $oid =~ /\A[a-f0-9]{40}\z/ or die "OID not expected: $oid";
+       $oid =~ /\A$OID\z/ or die "OID not expected: $oid";
        $oid;
 }
 
@@ -549,11 +552,11 @@ W: $list
        }
 
        # make sure we really got the OID:
-       my ($blob, $type, $bytes) = $self->{-inbox}->git->check($expect_oid);
+       my ($blob, $type, $bytes) = $self->{ibx}->git->check($expect_oid);
        $blob eq $expect_oid or die "BUG: $expect_oid not found after replace";
 
        # don't leak FDs to Xapian:
-       $self->{-inbox}->git->cleanup;
+       $self->{ibx}->git->cleanup;
 
        # reindex modified messages:
        for my $smsg (@$need_reindex) {
@@ -563,8 +566,8 @@ W: $list
                        num => $smsg->{num},
                        mid => $smsg->{mid},
                }, 'PublicInbox::Smsg';
-               my $v2w = { autime => $smsg->{ds}, cotime => $smsg->{ts} };
-               $new_smsg->populate($new_mime, $v2w);
+               my $sync = { autime => $smsg->{ds}, cotime => $smsg->{ts} };
+               $new_smsg->populate($new_mime, $sync);
                do_idx($self, \$raw, $new_mime, $new_smsg);
        }
        $rewritten->{rewrites};
@@ -673,18 +676,15 @@ sub done {
        my $nbytes = $self->{total_bytes};
        $self->{total_bytes} = 0;
        $self->lock_release(!!$nbytes) if $shards;
-       $self->{-inbox}->git->cleanup;
+       $self->{ibx}->git->cleanup;
 }
 
 sub fill_alternates ($$) {
        my ($self, $epoch) = @_;
 
-       my $pfx = "$self->{-inbox}->{inboxdir}/git";
-       my $all = "$self->{-inbox}->{inboxdir}/all.git";
-
-       unless (-d $all) {
-               PublicInbox::Import::init_bare($all);
-       }
+       my $pfx = "$self->{ibx}->{inboxdir}/git";
+       my $all = "$self->{ibx}->{inboxdir}/all.git";
+       PublicInbox::Import::init_bare($all) unless -d $all;
        my $info_dir = "$all/objects/info";
        my $alt = "$info_dir/alternates";
        my (%alt, $new);
@@ -694,7 +694,9 @@ sub fill_alternates ($$) {
                $mode = (stat($fh))[2] & 07777;
 
                # we assign a sort score to every alternate and favor
-               # the newest (highest numbered) one when we
+               # the newest (highest numbered) one because loose objects
+               # require scanning epochs and only the latest epoch is
+               # expected to see loose objects
                my $score;
                my $other = 0; # in case admin adds non-epoch repos
                %alt = map {;
@@ -726,7 +728,7 @@ sub fill_alternates ($$) {
 
 sub git_init {
        my ($self, $epoch) = @_;
-       my $git_dir = "$self->{-inbox}->{inboxdir}/git/$epoch.git";
+       my $git_dir = "$self->{ibx}->{inboxdir}/git/$epoch.git";
        PublicInbox::Import::init_bare($git_dir);
        my @cmd = (qw/git config/, "--file=$git_dir/config",
                        'include.path', '../../all.git/config');
@@ -738,7 +740,7 @@ sub git_init {
 sub git_dir_latest {
        my ($self, $max) = @_;
        $$max = -1;
-       my $pfx = "$self->{-inbox}->{inboxdir}/git";
+       my $pfx = "$self->{ibx}->{inboxdir}/git";
        return unless -d $pfx;
        my $latest;
        opendir my $dh, $pfx or die "opendir $pfx: $!\n";
@@ -790,7 +792,7 @@ sub importer {
 
 sub import_init {
        my ($self, $git, $packed_bytes, $tmp) = @_;
-       my $im = PublicInbox::Import->new($git, undef, undef, $self->{-inbox});
+       my $im = PublicInbox::Import->new($git, undef, undef, $self->{ibx});
        $im->{bytes_added} = int($packed_bytes / $PACKING_FACTOR);
        $im->{lock_path} = undef;
        $im->{path_type} = 'v2';
@@ -823,8 +825,7 @@ sub get_blob ($$) {
                return $msg if $msg;
        }
        # older message, should be in alternates
-       my $ibx = $self->{-inbox};
-       $ibx->msg_by_smsg($smsg);
+       $self->{ibx}->msg_by_smsg($smsg);
 }
 
 sub content_exists ($$$) {
@@ -849,8 +850,6 @@ sub content_exists ($$$) {
 
 sub atfork_child {
        my ($self) = @_;
-       my $fh = delete $self->{reindex_pipe};
-       close $fh if $fh;
        if (my $shards = $self->{idx_shards}) {
                $_->atfork_child foreach @$shards;
        }
@@ -862,16 +861,14 @@ sub atfork_child {
        $self->{bnote}->[1];
 }
 
-sub reindex_checkpoint ($$$) {
-       my ($self, $sync, $git) = @_;
+sub reindex_checkpoint ($$) {
+       my ($self, $sync) = @_;
 
-       $git->cleanup;
        $sync->{mm_tmp}->atfork_prepare;
        $self->done; # release lock
 
        if (my $pr = $sync->{-opt}->{-progress}) {
-               my ($bn) = (split('/', $git->{git_dir}))[-1];
-               $pr->("$bn ".sprintf($sync->{-regen_fmt}, $sync->{nr}));
+               $pr->(sprintf($sync->{-regen_fmt}, $sync->{nr}));
        }
 
        # allow -watch or -mda to write...
@@ -879,14 +876,11 @@ sub reindex_checkpoint ($$$) {
        $sync->{mm_tmp}->atfork_parent;
 }
 
-sub reindex_oid ($$$$) {
-       my ($self, $sync, $git, $oid) = @_;
-       if (my $D = $sync->{D}) { # don't waste I/O on deletes
-               return if $D->{pack('H*', $oid)};
-       }
-       return if PublicInbox::SearchIdx::too_big($self, $git, $oid);
+sub reindex_oid ($$$) {
+       my ($self, $sync, $oid) = @_;
+       return if PublicInbox::SearchIdx::too_big($self, $oid);
        my ($num, $mid0, $len);
-       my $msgref = $git->cat_file($oid, \$len);
+       my $msgref = $self->{ibx}->git->cat_file($oid, \$len);
        return if $len == 0; # purged
        my $mime = PublicInbox::Eml->new($$msgref);
        my $mids = mids($mime->header_obj);
@@ -953,9 +947,9 @@ sub reindex_oid ($$$$) {
                blob => $oid,
                mid => $mid0,
        }, 'PublicInbox::Smsg';
-       $smsg->populate($mime, $self);
+       $smsg->populate($mime, $sync);
        if (do_idx($self, $msgref, $mime, $smsg)) {
-               reindex_checkpoint($self, $sync, $git);
+               reindex_checkpoint($self, $sync);
        }
 }
 
@@ -971,7 +965,7 @@ sub update_last_commit ($$$$) {
        last_epoch_commit($self, $i, $cmt);
 }
 
-sub git_dir_n ($$) { "$_[0]->{-inbox}->{inboxdir}/git/$_[1].git" }
+sub git_dir_n ($$) { "$_[0]->{ibx}->{inboxdir}/git/$_[1].git" }
 
 sub last_commits ($$) {
        my ($self, $epoch_max) = @_;
@@ -1035,34 +1029,58 @@ $range
        $range;
 }
 
-# don't bump num_highwater on --reindex
-sub mark_deleted ($$$) {
+sub prepare_range_stack {
        my ($git, $sync, $range) = @_;
-       my $D = $sync->{D} //= {}; # pack("H*", $oid) => NR
-       my $fh = $git->popen(qw(log --raw --no-abbrev
-                       --pretty=tformat:%H
-                       --no-notes --no-color --no-renames
-                       --diff-filter=AM), $range, '--', 'd');
+       # Don't bump num_highwater on --reindex by using {D}.
+       # We intentionally do NOT use {D} in the non-reindex case because
+       # we want NNTP article number gaps from unindexed messages to
+       # show up in mirrors, too.
+       my $D = $sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR
+
+       my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
+                               --no-notes --no-color --no-renames --no-abbrev),
+                               $range);
+       my ($at, $ct, $stk);
        while (<$fh>) {
-               if (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) {
-                       $D->{pack('H*', $1)}++;
+               if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
+                       ($at, $ct) = ($1 + 0, $2 + 0);
+                       $stk //= PublicInbox::IdxStack->new($3);
+               } elsif (/\A:\d{6} 100644 $OID ($OID) [AM]\td$/o) {
+                       my $oid = $1;
+                       if ($D) { # reindex case
+                               $D->{pack('H*', $oid)}++;
+                       } else { # non-reindex case:
+                               $stk->push_rec('d', $at, $ct, $oid);
+                       }
+               } elsif (/\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o) {
+                       my $oid = $1;
+                       if ($D) {
+                               my $oid_bin = pack('H*', $oid);
+                               my $nr = --$D->{$oid_bin};
+                               delete($D->{$oid_bin}) if $nr <= 0;
+
+                               # nr < 0 (-1) means it never existed
+                               $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
+                       } else {
+                               $stk->push_rec('m', $at, $ct, $oid);
+                       }
                }
        }
        close $fh or die "git log failed: \$?=$?";
+       $stk ? $stk->read_prepare : undef;
 }
 
 sub sync_prepare ($$$) {
        my ($self, $sync, $epoch_max) = @_;
        my $pr = $sync->{-opt}->{-progress};
        my $regen_max = 0;
-       my $head = $self->{-inbox}->{ref_head} || 'refs/heads/master';
+       my $head = $self->{ibx}->{ref_head} || 'refs/heads/master';
 
        # reindex stops at the current heads and we later rerun index_sync
        # without {reindex}
        my $reindex_heads = last_commits($self, $epoch_max) if $sync->{reindex};
 
-       for my $i (0..$epoch_max) {
-               die 'BUG: already indexing!' if $self->{reindex_pipe};
+       for (my $i = $epoch_max; $i >= 0; $i--) {
                my $git_dir = git_dir_n($self, $i);
                -d $git_dir or next; # missing epochs are fine
                my $git = PublicInbox::Git->new($git_dir);
@@ -1073,19 +1091,24 @@ sub sync_prepare ($$$) {
 
                next if $?; # new repo
                my $range = log_range($self, $sync, $git, $i, $tip) or next;
-               $sync->{ranges}->[$i] = $range;
-
                # can't use 'rev-list --count' if we use --diff-filter
                $pr->("$i.git counting $range ... ") if $pr;
-               my $n = 0;
-               my $fh = $git->popen(qw(log --pretty=tformat:%H
-                               --no-notes --no-color --no-renames
-                               --diff-filter=AM), $range, '--', 'm');
-               ++$n while <$fh>;
-               close $fh or die "git log failed: \$?=$?";
-               $pr->("$n\n") if $pr;
-               $regen_max += $n;
-               mark_deleted($git, $sync, $range) if $sync->{reindex};
+               my $stk = prepare_range_stack($git, $sync, $range);
+               my $nr = $stk ? $stk->num_records : 0;
+               $pr->("$nr\n") if $pr;
+               $sync->{stacks}->[$i] = $stk if $stk;
+               $regen_max += $nr;
+       }
+
+       # XXX this should not happen unless somebody bypasses checks in
+       # our code and blindly injects "d" file history into git repos
+       if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
+               warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
+               for my $oid (@leftovers) {
+                       $oid = unpack('H*', $oid);
+                       $self->{current_info} = "leftover $oid";
+                       unindex_oid($self, $oid);
+               }
        }
        return 0 if (!$regen_max && !keys(%{$self->{unindex_range}}));
 
@@ -1107,10 +1130,10 @@ sub unindex_oid_remote ($$$) {
        }
 }
 
-sub unindex_oid ($$$;$) {
-       my ($self, $git, $oid, $unindexed) = @_;
+sub unindex_oid ($$;$) {
+       my ($self, $oid, $unindexed) = @_;
        my $mm = $self->{mm};
-       my $msgref = $git->cat_file($oid);
+       my $msgref = $self->{ibx}->git->cat_file($oid);
        my $mime = PublicInbox::Eml->new($msgref);
        my $mids = mids($mime->header_obj);
        $mime = $msgref = undef;
@@ -1137,6 +1160,8 @@ sub unindex_oid ($$$;$) {
        }
 }
 
+# this is rare, it only happens when we get discontiguous history in
+# a mirror because the source used -purge or -edit
 sub unindex ($$$$) {
        my ($self, $sync, $git, $unindex_range) = @_;
        my $unindexed = $self->{unindexed} ||= {}; # $mid0 => $num
@@ -1144,12 +1169,11 @@ sub unindex ($$$$) {
        # order does not matter, here:
        my @cmd = qw(log --raw -r
                        --no-notes --no-color --no-abbrev --no-renames);
-       my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $unindex_range);
+       my $fh = $git->popen(@cmd, $unindex_range);
        while (<$fh>) {
-               /\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o or next;
-               unindex_oid($self, $git, $1, $unindexed);
+               /\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o or next;
+               unindex_oid($self, $1, $unindexed);
        }
-       delete $self->{reindex_pipe};
        close $fh or die "git log failed: \$?=$?";
 
        return unless $sync->{-opt}->{prune};
@@ -1179,45 +1203,25 @@ sub index_epoch ($$$) {
        my ($self, $sync, $i) = @_;
 
        my $git_dir = git_dir_n($self, $i);
-       die 'BUG: already reindexing!' if $self->{reindex_pipe};
        -d $git_dir or return; # missing epochs are fine
-       fill_alternates($self, $i);
        my $git = PublicInbox::Git->new($git_dir);
-       if (my $unindex_range = delete $sync->{unindex_range}->{$i}) {
+       if (my $unindex_range = delete $sync->{unindex_range}->{$i}) { # rare
                unindex($self, $sync, $git, $unindex_range);
        }
-       defined(my $range = $sync->{ranges}->[$i]) or return;
-       if (my $pr = $sync->{-opt}->{-progress}) {
-               $pr->("$i.git indexing $range\n");
-       }
-       my @cmd = qw(log --reverse --raw -r --pretty=tformat:%H.%at.%ct
-                       --no-notes --no-color --no-abbrev --no-renames);
-       my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $range);
-       my $cmt;
-       my $D = $sync->{D};
-       while (<$fh>) {
-               chomp;
-               $self->{current_info} = "$i.git $_";
-               if (/\A($x40)\.([0-9]+)\.([0-9]+)$/o) {
-                       $cmt = $1;
-                       $self->{autime} = $2;
-                       $self->{cotime} = $3;
-               } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) {
-                       reindex_oid($self, $sync, $git, $1);
-               } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) {
-                       # allow re-add if there was user error
-                       my $oid = $1;
-                       if ($D) {
-                               my $oid_bin = pack('H*', $oid);
-                               my $nr = --$D->{$oid_bin};
-                               delete($D->{$oid_bin}) if $nr <= 0;
-                       }
-                       unindex_oid($self, $git, $oid);
+       defined(my $stk = $sync->{stacks}->[$i]) or return;
+       $sync->{stacks}->[$i] = undef;
+       while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+               $self->{current_info} = "$i.git $oid";
+               if ($f eq 'm') {
+                       $sync->{autime} = $at;
+                       $sync->{cotime} = $ct;
+                       reindex_oid($self, $sync, $oid);
+               } elsif ($f eq 'd') {
+                       unindex_oid($self, $oid);
                }
        }
-       close $fh or die "git log failed: \$?=$?";
-       delete @$self{qw(reindex_pipe autime cotime)};
-       update_last_commit($self, $git, $i, $cmt) if defined $cmt;
+       delete @$sync{qw(autime cotime)};
+       update_last_commit($self, $git, $i, $stk->{latest_cmt});
 }
 
 # public, called by public-inbox-index
@@ -1229,6 +1233,7 @@ sub index_sync {
        my $latest = git_dir_latest($self, \$epoch_max);
        return unless defined $latest;
        $self->idx_init($opt); # acquire lock
+       fill_alternates($self, $epoch_max);
        $self->{over}->rethread_prepare($opt);
        my $sync = {
                unindex_range => {}, # EPOCH => oid_old..oid_new