]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/SearchIdx.pm
searchidx: make v1 indexing closer to v2
[public-inbox.git] / lib / PublicInbox / SearchIdx.pm
index 89c716793278bbbd1baf781832c46d291cbc5d5b..c57a7e1647c11e61d7f45aa23dbe2fd524b439f0 100644 (file)
@@ -14,6 +14,7 @@ use PublicInbox::Eml;
 use PublicInbox::InboxWritable;
 use PublicInbox::MID qw(mid_mime mids_for_index mids);
 use PublicInbox::MsgIter;
+use PublicInbox::IdxStack;
 use Carp qw(croak);
 use POSIX qw(strftime);
 use PublicInbox::OverIdx;
@@ -27,6 +28,10 @@ our $BATCH_BYTES = defined($ENV{XAPIAN_FLUSH_THRESHOLD}) ?
 use constant DEBUG => !!$ENV{DEBUG};
 
 my $xapianlevels = qr/\A(?:full|medium)\z/;
+my $hex = '[a-f0-9]';
+my $OID = $hex .'{40,}';
+my $addmsg = qr!^:000000 100644 \S+ ($OID) A\t${hex}{2}/${hex}{38}$!;
+my $delmsg = qr!^:100644 000000 ($OID) \S+ D\t${hex}{2}/${hex}{38}$!;
 
 sub new {
        my ($class, $ibx, $creat, $shard) = @_;
@@ -385,7 +390,7 @@ sub add_message {
        $smsg->{mid} //= $mids->[0]; # v1 compatibility
        $smsg->{num} //= do { # v1
                _msgmap_init($self);
-               index_mm($self, $mime);
+               index_mm($self, $mime, $smsg->{blob}, $sync);
        };
 
        # v1 and tests only:
@@ -477,34 +482,20 @@ sub unindex_eml {
 }
 
 sub index_mm {
-       my ($self, $mime) = @_;
-       my $mid = mid_mime($mime);
+       my ($self, $mime, $oid, $sync) = @_;
+       my $mids = mids($mime);
        my $mm = $self->{mm};
-       my $num;
-
-       if (defined $self->{regen_down}) {
-               $num = $mm->num_for($mid) and return $num;
-
-               while (($num = $self->{regen_down}--) > 0) {
-                       if ($mm->mid_set($num, $mid) != 0) {
-                               return $num;
-                       }
-               }
-       } elsif (defined $self->{regen_up}) {
-               $num = $mm->num_for($mid) and return $num;
-
-               # this is to fixup old bugs due to add-remove-add
-               while (($num = ++$self->{regen_up})) {
-                       if ($mm->mid_set($num, $mid) != 0) {
-                               return $num;
-                       }
+       if ($sync->{reindex}) {
+               my $over = $self->{over};
+               for my $mid (@$mids) {
+                       my ($num, undef) = $over->num_mid0_for_oid($oid, $mid);
+                       return $num if defined $num;
                }
+               $mm->num_for($mids->[0]) // $mm->mid_insert($mids->[0]);
+       } else {
+               # fallback to num_for since filters like RubyLang set the number
+               $mm->mid_insert($mids->[0]) // $mm->num_for($mids->[0]);
        }
-
-       $num = $mm->mid_insert($mid) and return $num;
-
-       # fallback to num_for since filters like RubyLang set the number
-       $mm->num_for($mid);
 }
 
 sub unindex_mm {
@@ -532,8 +523,8 @@ sub index_both { # git->cat_async callback
        my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg';
        my $self = $sync->{sidx};
        my $eml = PublicInbox::Eml->new($bref);
-       my $num = index_mm($self, $eml);
-       $smsg->{num} = $num;
+       $smsg->{num} = index_mm($self, $eml, $oid, $sync) or
+               die "E: could not generate NNTP article number for $oid";
        add_message($self, $eml, $smsg, $sync);
 }
 
@@ -549,6 +540,11 @@ sub index_sync {
        my ($self, $opts) = @_;
        delete $self->{lock_path} if $opts->{-skip_lock};
        $self->{ibx}->with_umask(\&_index_sync, $self, $opts);
+       if ($opts->{reindex}) {
+               my %again = %$opts;
+               delete @again{qw(rethread reindex)};
+               index_sync($self, \%again);
+       }
 }
 
 sub too_big ($$) {
@@ -562,110 +558,87 @@ sub too_big ($$) {
 }
 
 # only for v1
-sub read_log {
-       my ($self, $log, $batch_cb) = @_;
-       my $hex = '[a-f0-9]';
-       my $h40 = $hex .'{40}';
-       my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
-       my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
+sub process_stack {
+       my ($self, $stk, $sync, $batch_cb) = @_;
        my $git = $self->{ibx}->git;
-       my $latest;
        my $max = $BATCH_BYTES;
-       local $/ = "\n";
-       my %D;
-       my $line;
-       my $newest;
        my $nr = 0;
-       my $sync = { sidx => $self, nr => \$nr, max => \$max };
-       while (defined($line = <$log>)) {
-               if ($line =~ /$addmsg/o) {
-                       my $blob = $1;
-                       if (delete $D{$blob}) {
-                               # make sure pending index writes are done
-                               # before writing to ->mm
-                               $git->cat_async_wait;
-
-                               if (defined $self->{regen_down}) {
-                                       my $num = $self->{regen_down}--;
-                                       $self->{mm}->num_highwater($num);
-                               }
-                               next;
-                       }
-                       next if too_big($self, $blob);
-                       $git->cat_async($blob, \&index_both, { %$sync });
+       $sync->{nr} = \$nr;
+       $sync->{max} = \$max;
+       $sync->{sidx} = $self;
+
+       if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
+               warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
+               for my $oid (@leftovers) {
+                       $oid = unpack('H*', $oid);
+                       $git->cat_async($oid, \&unindex_both, $self);
+               }
+       }
+       while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+               if ($f eq 'm') {
+                       $sync->{autime} = $at;
+                       $sync->{cotime} = $ct;
+                       next if too_big($self, $oid);
+                       $git->cat_async($oid, \&index_both, { %$sync });
                        if ($max <= 0) {
                                $git->cat_async_wait;
                                $max = $BATCH_BYTES;
-                               $batch_cb->($nr, $latest);
+                               $batch_cb->($nr);
                        }
-               } elsif ($line =~ /$delmsg/o) {
-                       my $blob = $1;
-                       $D{$blob} = 1 unless too_big($self, $blob);
-               } elsif ($line =~ /^commit ($h40)/o) {
-                       $latest = $1;
-                       $newest ||= $latest;
-               } elsif ($line =~ /^author .*? ([0-9]+) [\-\+][0-9]+$/) {
-                       $sync->{autime} = $1;
-               } elsif ($line =~ /^committer .*? ([0-9]+) [\-\+][0-9]+$/) {
-                       $sync->{cotime} = $1;
+               } elsif ($f eq 'd') {
+                       $git->cat_async($oid, \&unindex_both, $self);
                }
        }
-       close($log) or die "git log failed: \$?=$?";
-       # get the leftovers
-       foreach my $blob (keys %D) {
-               $git->cat_async($blob, \&unindex_both, $self);
-       }
        $git->cat_async_wait;
-       $batch_cb->($nr, $latest, $newest);
+       $batch_cb->($nr, $stk);
 }
 
-sub _git_log {
-       my ($self, $opts, $range) = @_;
+sub prepare_stack ($$$) {
+       my ($self, $sync, $range) = @_;
        my $git = $self->{ibx}->git;
 
        if (index($range, '..') < 0) {
                # don't show annoying git errors to users who run -index
                # on empty inboxes
                $git->qx(qw(rev-parse -q --verify), "$range^0");
-               if ($?) {
-                       open my $fh, '<', '/dev/null' or
-                               die "failed to open /dev/null: $!\n";
-                       return $fh;
-               }
+               return PublicInbox::IdxStack->new->read_prepare if $?;
        }
+       my $D = $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
 
        # Count the new files so they can be added newest to oldest
        # and still have numbers increasing from oldest to newest
-       my $fcount = 0;
-       my $pr = $opts->{-progress};
-       $pr->("counting changes\n\t$range ... ") if $pr;
-       # can't use 'rev-list --count' if we use --diff-filter
-       my $fh = $git->popen(qw(log --pretty=tformat:%h
-                            --no-notes --no-color --no-renames
-                            --diff-filter=AM), $range);
-       ++$fcount while <$fh>;
-       close $fh or die "git log failed: \$?=$?";
-       my $high = $self->{mm}->num_highwater;
-       $pr->("$fcount\n") if $pr; # continue previous line
-       $self->{ntodo} = $fcount;
-
-       if (index($range, '..') < 0) {
-               if ($high && $high == $fcount) {
-                       # fix up old bugs in full indexes which caused messages to
-                       # not appear in Msgmap
-                       $self->{regen_up} = $high;
-               } else {
-                       # normal regen is for for fresh data
-                       $self->{regen_down} = $fcount;
-                       $self->{regen_down} += $high unless $opts->{reindex};
+       my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
+                               --no-notes --no-color --no-renames --no-abbrev),
+                               $range);
+       my ($at, $ct, $stk);
+       while (<$fh>) {
+               if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
+                       ($at, $ct) = ($1 + 0, $2 + 0);
+                       $stk //= PublicInbox::IdxStack->new($3);
+               } elsif (/$delmsg/) {
+                       my $oid = $1;
+                       if ($D) { # reindex case
+                               $D->{pack('H*', $oid)}++;
+                       } else { # non-reindex case:
+                               $stk->push_rec('d', $at, $ct, $oid);
+                       }
+               } elsif (/$addmsg/) {
+                       my $oid = $1;
+                       if ($D) {
+                               my $oid_bin = pack('H*', $oid);
+                               my $nr = --$D->{$oid_bin};
+                               delete($D->{$oid_bin}) if $nr <= 0;
+
+                               # nr < 0 (-1) means it never existed
+                               $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
+                       } else {
+                               $stk->push_rec('m', $at, $ct, $oid);
+                       }
                }
-       } else {
-               # Give oldest messages the smallest numbers
-               $self->{regen_down} = $high + $fcount;
        }
-
-       $git->popen(qw/log --pretty=raw --no-notes --no-color --no-renames
-                               --raw -r --no-abbrev/, $range);
+       close $fh or die "git log failed: \$?=$?";
+       $stk //= PublicInbox::IdxStack->new;
+       $stk->read_prepare;
 }
 
 # --is-ancestor requires git 1.8.0+
@@ -717,45 +690,43 @@ sub reindex_from ($$) {
 sub _index_sync {
        my ($self, $opts) = @_;
        my $tip = $opts->{ref} || 'HEAD';
-       my ($last_commit, $lx, $xlog);
        my $git = $self->{ibx}->git;
        $git->batch_prepare;
        my $pr = $opts->{-progress};
-
+       my $sync = { reindex => $opts->{reindex} };
        my $xdb = $self->begin_txn_lazy;
        $self->{over}->rethread_prepare($opts);
        my $mm = _msgmap_init($self);
-       do {
-               $xlog = undef; # stop previous git-log via SIGPIPE
-               $last_commit = _last_x_commit($self, $mm);
-               $lx = reindex_from($opts->{reindex}, $last_commit);
-
-               $self->{over}->rollback_lazy;
-               $self->{over}->disconnect;
-               $git->cleanup;
-               delete $self->{txn};
-               $xdb->cancel_transaction if $xdb;
-               $xdb = idx_release($self);
-
-               # ensure we leak no FDs to "git log" with Xapian <= 1.2
-               my $range = $lx eq '' ? $tip : "$lx..$tip";
-               $xlog = _git_log($self, $opts, $range);
-
-               $xdb = $self->begin_txn_lazy;
-       } while (_last_x_commit($self, $mm) ne $last_commit);
+       if ($sync->{reindex}) {
+               my $last = $mm->last_commit;
+               if ($last) {
+                       $tip = $last;
+               } else {
+                       # somebody just blindly added --reindex when indexing
+                       # for the first time, allow it:
+                       undef $sync->{reindex};
+               }
+       }
+       my $last_commit = _last_x_commit($self, $mm);
+       my $lx = reindex_from($sync->{reindex}, $last_commit);
+       my $range = $lx eq '' ? $tip : "$lx..$tip";
+       $pr->("counting changes\n\t$range ... ") if $pr;
+       my $stk = prepare_stack($self, $sync, $range);
+       $sync->{ntodo} = $stk ? $stk->num_records : 0;
+       $pr->("$sync->{ntodo}\n") if $pr; # continue previous line
 
-       my $dbh = $mm->{dbh} if $mm;
+       my $dbh = $mm->{dbh};
        my $batch_cb = sub {
-               my ($nr, $commit, $newest) = @_;
-               if ($dbh) {
-                       if ($newest) {
-                               my $cur = $mm->last_commit || '';
-                               if (need_update($self, $cur, $newest)) {
-                                       $mm->last_commit($newest);
-                               }
+               my ($nr, $stk) = @_;
+               # latest_cmt may be undef
+               my $newest = $stk ? $stk->{latest_cmt} : undef;
+               if ($newest) {
+                       my $cur = $mm->last_commit || '';
+                       if (need_update($self, $cur, $newest)) {
+                               $mm->last_commit($newest);
                        }
-                       $dbh->commit;
                }
+               $dbh->commit;
                if ($newest && need_xapian($self)) {
                        my $cur = $xdb->get_metadata('last_commit');
                        if (need_update($self, $cur, $newest)) {
@@ -768,15 +739,15 @@ sub _index_sync {
                $git->cleanup;
                $xdb = idx_release($self, $nr);
                # let another process do some work...
-               $pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
-               if (!$newest) { # more to come
+               $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr;
+               if (!$stk) { # more to come
                        $xdb = $self->begin_txn_lazy;
-                       $dbh->begin_work if $dbh;
+                       $dbh->begin_work;
                }
        };
 
        $dbh->begin_work;
-       read_log($self, $xlog, $batch_cb);
+       process_stack($self, $stk, $sync, $batch_cb);
 }
 
 sub DESTROY {