]> Sergey Matveev's repositories - public-inbox.git/commitdiff
*index: checkpoints write last_commit metadata
authorEric Wong <e@80x24.org>
Fri, 13 Nov 2020 11:11:41 +0000 (11:11 +0000)
committerEric Wong <e@80x24.org>
Sun, 15 Nov 2020 02:52:24 +0000 (02:52 +0000)
This will set us up for supporting graceful shutdown
on -index without repeating any work.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/IdxStack.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/V2Writable.pm
t/idx_stack.t

index 7aaf8291d061909877495313481662429ddea69c..14ffdadb36eebff00606beae037f0838b116d87d 100644 (file)
@@ -168,6 +168,10 @@ sub do_finalize ($) {
                # `d' message was already unindexed in the v1/v2 inboxes,
                # so it's too noisy to warn, here.
        }
+       # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
+       if (defined(my $cur_cmt = $req->{cur_cmt})) {
+               ${$req->{latest_cmt}} = $cur_cmt;
+       }
 }
 
 sub do_step ($) { # main iterator for adding messages to the index
@@ -337,10 +341,10 @@ sub eidx_sync { # main entry point
 }
 
 sub update_last_commit { # overrides V2Writable
-       my ($self, $sync, $unit, $latest_cmt) = @_;
-       return unless defined $latest_cmt;
-
-       $self->git->async_wait_all;
+       my ($self, $sync, $stk) = @_;
+       my $unit = $sync->{unit} // return;
+       my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+       defined($latest_cmt) or return;
        my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
        my $ekey = $ibx->eidx_key;
        my $uv = $ibx->uidvalidity;
index ce75b46afe99604ec2542bb01634a1f7628c2ea8..e7e10de94da8cc3a9aa97322e052c9834a944146 100644 (file)
@@ -6,7 +6,7 @@ package PublicInbox::IdxStack;
 use v5.10.1;
 use strict;
 use Fcntl qw(:seek);
-use constant FMT => eval { pack('Q', 1) } ? 'A1QQH*' : 'A1IIH*';
+use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
 
 # start off in write-only mode
 sub new {
@@ -16,9 +16,15 @@ sub new {
 
 # file_char = [d|m]
 sub push_rec {
-       my ($self, $file_char, $at, $ct, $blob_oid) = @_;
-       my $rec = pack(FMT, $file_char, $at, $ct, $blob_oid);
-       $self->{rec_size} //= length($rec);
+       my ($self, $file_char, $at, $ct, $blob_oid, $cmt_oid) = @_;
+       my $rec = pack(PACK_FMT, $file_char, $at, $ct, $blob_oid, $cmt_oid);
+       $self->{unpack_fmt} //= do {
+               my $len = length($cmt_oid);
+               my $fmt = PACK_FMT;
+               $fmt =~ s/H\*/H$len/g;
+               $self->{rec_size} = length($rec);
+               $fmt;
+       };
        print { $self->{wr} } $rec or die "print: $!";
        $self->{tot_size} += length($rec);
 }
@@ -46,7 +52,7 @@ sub pop_rec {
        my $r = read($io, my $buf, $sz);
        defined($r) or die "read: $!";
        $r == $sz or die "read($r != $sz)";
-       unpack(FMT, $buf);
+       unpack($self->{unpack_fmt}, $buf);
 }
 
 1;
index 662055c6595fcbcdb156253356612ee6e4a353fc..90d8c8b33db860f927fe9b4d120479f0181f2f21 100644 (file)
@@ -608,11 +608,17 @@ sub index_both { # git->cat_async callback
        $smsg->{num} = index_mm($self, $eml, $oid, $sync) or
                die "E: could not generate NNTP article number for $oid";
        add_message($self, $eml, $smsg, $sync);
+       my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+       ${$sync->{latest_cmt}} = $cur_cmt;
 }
 
 sub unindex_both { # git->cat_async callback
-       my ($bref, $oid, $type, $size, $self) = @_;
-       unindex_eml($self, $oid, PublicInbox::Eml->new($bref));
+       my ($bref, $oid, $type, $size, $sync) = @_;
+       unindex_eml($sync->{sidx}, $oid, PublicInbox::Eml->new($bref));
+       # may be undef if leftover
+       if (defined(my $cur_cmt = $sync->{cur_cmt})) {
+               ${$sync->{latest_cmt}} = $cur_cmt;
+       }
 }
 
 sub with_umask {
@@ -646,34 +652,33 @@ sub v1_checkpoint ($$;$) {
        my ($self, $sync, $stk) = @_;
        $self->{ibx}->git->async_wait_all;
 
-       # latest_cmt may be undef
-       my $newest = $stk ? $stk->{latest_cmt} : undef;
-       if ($newest) {
+       # $newest may be undef
+       my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+       if (defined($newest)) {
                my $cur = $self->{mm}->last_commit || '';
                if (need_update($self, $cur, $newest)) {
                        $self->{mm}->last_commit($newest);
                }
-       } else {
-               ${$sync->{max}} = $self->{batch_bytes};
        }
+       ${$sync->{max}} = $self->{batch_bytes};
 
        $self->{mm}->{dbh}->commit;
-       if ($newest && need_xapian($self)) {
-               my $xdb = $self->{xdb};
+       my $xdb = need_xapian($self) ? $self->{xdb} : undef;
+       if ($newest && $xdb) {
                my $cur = $xdb->get_metadata('last_commit');
                if (need_update($self, $cur, $newest)) {
                        $xdb->set_metadata('last_commit', $newest);
                }
-
+       }
+       if ($stk) { # all done if $stk is passed
                # let SearchView know a full --reindex was done so it can
                # generate ->has_threadid-dependent links
-               if ($sync->{reindex} && !ref($sync->{reindex})) {
+               if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) {
                        my $n = $xdb->get_metadata('has_threadid');
                        $xdb->set_metadata('has_threadid', '1') if $n ne '1';
                }
+               $self->{oidx}->rethread_done($sync->{-opt}); # all done
        }
-
-       $self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done
        commit_txn_lazy($self);
        $sync->{ibx}->git->cleanup;
        my $nr = ${$sync->{nr}};
@@ -697,21 +702,24 @@ sub process_stack {
        $sync->{nr} = \$nr;
        $sync->{max} = \$max;
        $sync->{sidx} = $self;
+       $sync->{latest_cmt} = \(my $latest_cmt);
 
        $self->{mm}->{dbh}->begin_work;
        if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
                warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
                for my $oid (@leftovers) {
                        $oid = unpack('H*', $oid);
-                       $git->cat_async($oid, \&unindex_both, $self);
+                       $git->cat_async($oid, \&unindex_both, $sync);
                }
        }
        if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
                $sync->{index_oid} = \&index_both;
        }
-       while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+       while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
+               my $arg = { %$sync, cur_cmt => $cur_cmt };
                if ($f eq 'm') {
-                       my $arg = { %$sync, autime => $at, cotime => $ct };
+                       $arg->{autime} = $at;
+                       $arg->{cotime} = $ct;
                        if ($sync->{max_size}) {
                                $git->check_async($oid, \&check_size, $arg);
                        } else {
@@ -719,7 +727,7 @@ sub process_stack {
                        }
                        v1_checkpoint($self, $sync) if $max <= 0;
                } elsif ($f eq 'd') {
-                       $git->cat_async($oid, \&unindex_both, $self);
+                       $git->cat_async($oid, \&unindex_both, $arg);
                }
        }
        v1_checkpoint($self, $sync, $stk);
@@ -743,17 +751,17 @@ sub log2stack ($$$) {
        my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
                                --no-notes --no-color --no-renames --no-abbrev),
                                $range);
-       my ($at, $ct, $stk);
+       my ($at, $ct, $stk, $cmt);
        while (<$fh>) {
                if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
-                       ($at, $ct) = ($1 + 0, $2 + 0);
-                       $stk //= PublicInbox::IdxStack->new($3);
+                       ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
+                       $stk //= PublicInbox::IdxStack->new($cmt);
                } elsif (/$del/) {
                        my $oid = $1;
                        if ($D) { # reindex case
                                $D->{pack('H*', $oid)}++;
                        } else { # non-reindex case:
-                               $stk->push_rec('d', $at, $ct, $oid);
+                               $stk->push_rec('d', $at, $ct, $oid, $cmt);
                        }
                } elsif (/$add/) {
                        my $oid = $1;
@@ -761,12 +769,10 @@ sub log2stack ($$$) {
                                my $oid_bin = pack('H*', $oid);
                                my $nr = --$D->{$oid_bin};
                                delete($D->{$oid_bin}) if $nr <= 0;
-
                                # nr < 0 (-1) means it never existed
-                               $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
-                       } else {
-                               $stk->push_rec('m', $at, $ct, $oid);
+                               next if $nr >= 0;
                        }
+                       $stk->push_rec('m', $at, $ct, $oid, $cmt);
                }
        }
        close $fh or die "git log failed: \$?=$?";
index 18f33655385b1814e0bce571ea94b30dd3593af8..87b76501a5e1fb984acc6eab0c13197cd1015694 100644 (file)
@@ -861,6 +861,7 @@ sub reindex_checkpoint ($$) {
        my ($self, $sync) = @_;
 
        $self->git->async_wait_all;
+       $self->update_last_commit($sync);
        ${$sync->{need_checkpoint}} = 0;
        my $mm_tmp = $sync->{mm_tmp};
        $mm_tmp->atfork_prepare if $mm_tmp;
@@ -955,19 +956,22 @@ sub index_oid { # cat_async callback
        if (do_idx($self, $bref, $eml, $smsg)) {
                ${$arg->{need_checkpoint}} = 1;
        }
+       ${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
 }
 
 # only update last_commit for $i on reindex iff newer than current
-# $sync will be used by subclasses
 sub update_last_commit {
-       my ($self, $sync, $unit, $cmt) = @_;
+       my ($self, $sync, $stk) = @_;
+       my $unit = $sync->{unit} // return;
+       my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+       defined($latest_cmt) or return;
        my $last = last_epoch_commit($self, $unit->{epoch});
-       if (defined $last && is_ancestor($unit->{git}, $last, $cmt)) {
-               my @cmd = (qw(rev-list --count), "$last..$cmt");
+       if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+               my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
                chomp(my $n = $unit->{git}->qx(@cmd));
                return if $n ne '' && $n == 0;
        }
-       last_epoch_commit($self, $unit->{epoch}, $cmt);
+       last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
 }
 
 sub last_commits {
@@ -1245,8 +1249,16 @@ sub index_todo ($$$) {
                $pfx //= $unit->{git}->{git_dir};
        }
        local $self->{current_info} = "$pfx ";
-       while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
-               my $req = { %$sync, autime => $at, cotime => $ct, oid => $oid };
+       local $sync->{latest_cmt} = \(my $latest_cmt);
+       local $sync->{unit} = $unit;
+       while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+               my $req = {
+                       %$sync,
+                       autime => $at,
+                       cotime => $ct,
+                       oid => $oid,
+                       cur_cmt => $cmt
+               };
                if ($f eq 'm') {
                        if ($sync->{max_size}) {
                                $all->check_async($oid, \&check_size, $req);
@@ -1261,7 +1273,7 @@ sub index_todo ($$$) {
                }
        }
        $all->async_wait_all;
-       $self->update_last_commit($sync, $unit, $stk->{latest_cmt});
+       $self->update_last_commit($sync, $stk);
 }
 
 sub xapian_only {
index 35aff37b739f264e2f1d5106dbf14feacf2a25cd..e0474fa4258342f5a3131cd3666c6acd0ffbb009 100644 (file)
@@ -6,6 +6,8 @@ use Test::More;
 use_ok 'PublicInbox::IdxStack';
 my $oid_a = '03c21563cf15c241687966b5b2a3f37cdc193316';
 my $oid_b = '963caad026055ab9bcbe3ee9550247f9d8840feb';
+my $cmt_a = 'df8e4a0612545d53672036641e9f076efc94c2f6';
+my $cmt_b = '3ba7c9fa4a083c439e768882c571c2026a981ca5';
 
 my $stk = PublicInbox::IdxStack->new;
 is($stk->read_prepare, $stk, 'nothing');
@@ -13,19 +15,19 @@ is($stk->num_records, 0, 'no records');
 is($stk->pop_rec, undef, 'undef on empty');
 
 $stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
 is($stk->read_prepare, $stk, 'read_prepare');
 is($stk->num_records, 1, 'num_records');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop once');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop once');
 is($stk->pop_rec, undef, 'undef on empty');
 
 $stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
-$stk->push_rec('d', 1234, 5678, $oid_b);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
+$stk->push_rec('d', 1234, 5678, $oid_b, $cmt_b);
 is($stk->read_prepare, $stk, 'read_prepare');
 is($stk->num_records, 2, 'num_records');
-is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b], 'pop');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop-pop');
+is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b, $cmt_b], 'pop');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop-pop');
 is($stk->pop_rec, undef, 'empty');
 
 SKIP: {
@@ -37,11 +39,11 @@ SKIP: {
        while (<$fh>) {
                chomp;
                my ($at, $ct, $H) = split(/\./);
-               $stk //= PublicInbox::IdxStack->new($H);
+               $stk //= PublicInbox::IdxStack->new;
                # not bothering to parse blobs here, just using commit OID
                # as a blob OID since they're the same size + format
-               $stk->push_rec('m', $at + 0, $ct + 0, $H);
-               push(@expect, [ 'm', $at, $ct, $H ]);
+               $stk->push_rec('m', $at + 0, $ct + 0, $H, $H);
+               push(@expect, [ 'm', $at, $ct, $H, $H ]);
        }
        $stk or skip('nothing from git log', 3);
        is($stk->read_prepare, $stk, 'read_prepare');