use PublicInbox::InboxWritable;
use PublicInbox::MID qw(mid_mime mids_for_index mids);
use PublicInbox::MsgIter;
+use PublicInbox::IdxStack;
use Carp qw(croak);
use POSIX qw(strftime);
use PublicInbox::OverIdx;
use constant DEBUG => !!$ENV{DEBUG};
my $xapianlevels = qr/\A(?:full|medium)\z/;
+my $hex = '[a-f0-9]';
+my $OID = $hex .'{40,}';
+my $addmsg = qr!^:000000 100644 \S+ ($OID) A\t${hex}{2}/${hex}{38}$!;
+my $delmsg = qr!^:100644 000000 ($OID) \S+ D\t${hex}{2}/${hex}{38}$!;
sub new {
my ($class, $ibx, $creat, $shard) = @_;
sub need_xapian ($) { $_[0]->{indexlevel} =~ $xapianlevels }
-sub _xdb_release {
+sub idx_release {
my ($self, $wake) = @_;
if (need_xapian($self)) {
my $xdb = delete $self->{xdb} or croak 'not acquired';
1;
}
-sub _xdb_acquire {
+sub idx_acquire {
my ($self) = @_;
my $flag;
my $dir = $self->xdir;
$smsg->{mid} //= $mids->[0]; # v1 compatibility
$smsg->{num} //= do { # v1
_msgmap_init($self);
- index_mm($self, $mime);
+ index_mm($self, $mime, $smsg->{blob}, $sync);
};
# v1 and tests only:
}
sub index_mm {
- my ($self, $mime) = @_;
- my $mid = mid_mime($mime);
+ my ($self, $mime, $oid, $sync) = @_;
+ my $mids = mids($mime);
my $mm = $self->{mm};
- my $num;
-
- if (defined $self->{regen_down}) {
- $num = $mm->num_for($mid) and return $num;
-
- while (($num = $self->{regen_down}--) > 0) {
- if ($mm->mid_set($num, $mid) != 0) {
- return $num;
- }
- }
- } elsif (defined $self->{regen_up}) {
- $num = $mm->num_for($mid) and return $num;
-
- # this is to fixup old bugs due to add-remove-add
- while (($num = ++$self->{regen_up})) {
- if ($mm->mid_set($num, $mid) != 0) {
- return $num;
- }
+ if ($sync->{reindex}) {
+ my $over = $self->{over};
+ for my $mid (@$mids) {
+ my ($num, undef) = $over->num_mid0_for_oid($oid, $mid);
+ return $num if defined $num;
}
+ $mm->num_for($mids->[0]) // $mm->mid_insert($mids->[0]);
+ } else {
+ # fallback to num_for since filters like RubyLang set the number
+ $mm->mid_insert($mids->[0]) // $mm->num_for($mids->[0]);
}
-
- $num = $mm->mid_insert($mid) and return $num;
-
- # fallback to num_for since filters like RubyLang set the number
- $mm->num_for($mid);
}
sub unindex_mm {
my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg';
my $self = $sync->{sidx};
my $eml = PublicInbox::Eml->new($bref);
- my $num = index_mm($self, $eml);
- $smsg->{num} = $num;
+ $smsg->{num} = index_mm($self, $eml, $oid, $sync) or
+ die "E: could not generate NNTP article number for $oid";
add_message($self, $eml, $smsg, $sync);
}
my ($self, $opts) = @_;
delete $self->{lock_path} if $opts->{-skip_lock};
$self->{ibx}->with_umask(\&_index_sync, $self, $opts);
+ if ($opts->{reindex}) {
+ my %again = %$opts;
+ delete @again{qw(rethread reindex)};
+ index_sync($self, \%again);
+ }
}
sub too_big ($$) {
}
# only for v1
-sub read_log {
- my ($self, $log, $batch_cb) = @_;
- my $hex = '[a-f0-9]';
- my $h40 = $hex .'{40}';
- my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
- my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
+sub process_stack {
+ my ($self, $stk, $sync, $batch_cb) = @_;
my $git = $self->{ibx}->git;
- my $latest;
my $max = $BATCH_BYTES;
- local $/ = "\n";
- my %D;
- my $line;
- my $newest;
my $nr = 0;
- my $sync = { sidx => $self, nr => \$nr, max => \$max };
- while (defined($line = <$log>)) {
- if ($line =~ /$addmsg/o) {
- my $blob = $1;
- if (delete $D{$blob}) {
- # make sure pending index writes are done
- # before writing to ->mm
- $git->cat_async_wait;
-
- if (defined $self->{regen_down}) {
- my $num = $self->{regen_down}--;
- $self->{mm}->num_highwater($num);
- }
- next;
- }
- next if too_big($self, $blob);
- $git->cat_async($blob, \&index_both, { %$sync });
+ $sync->{nr} = \$nr;
+ $sync->{max} = \$max;
+ $sync->{sidx} = $self;
+
+ if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
+ warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
+ for my $oid (@leftovers) {
+ $oid = unpack('H*', $oid);
+ $git->cat_async($oid, \&unindex_both, $self);
+ }
+ }
+ while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+ if ($f eq 'm') {
+ $sync->{autime} = $at;
+ $sync->{cotime} = $ct;
+ next if too_big($self, $oid);
+ $git->cat_async($oid, \&index_both, { %$sync });
if ($max <= 0) {
$git->cat_async_wait;
$max = $BATCH_BYTES;
- $batch_cb->($nr, $latest);
+ $batch_cb->($nr);
}
- } elsif ($line =~ /$delmsg/o) {
- my $blob = $1;
- $D{$blob} = 1 unless too_big($self, $blob);
- } elsif ($line =~ /^commit ($h40)/o) {
- $latest = $1;
- $newest ||= $latest;
- } elsif ($line =~ /^author .*? ([0-9]+) [\-\+][0-9]+$/) {
- $sync->{autime} = $1;
- } elsif ($line =~ /^committer .*? ([0-9]+) [\-\+][0-9]+$/) {
- $sync->{cotime} = $1;
+ } elsif ($f eq 'd') {
+ $git->cat_async($oid, \&unindex_both, $self);
}
}
- close($log) or die "git log failed: \$?=$?";
- # get the leftovers
- foreach my $blob (keys %D) {
- $git->cat_async($blob, \&unindex_both, $self);
- }
$git->cat_async_wait;
- $batch_cb->($nr, $latest, $newest);
+ $batch_cb->($nr, $stk);
}
-sub _git_log {
- my ($self, $opts, $range) = @_;
+sub prepare_stack ($$$) {
+ my ($self, $sync, $range) = @_;
my $git = $self->{ibx}->git;
if (index($range, '..') < 0) {
# don't show annoying git errors to users who run -index
# on empty inboxes
$git->qx(qw(rev-parse -q --verify), "$range^0");
- if ($?) {
- open my $fh, '<', '/dev/null' or
- die "failed to open /dev/null: $!\n";
- return $fh;
- }
+ return PublicInbox::IdxStack->new->read_prepare if $?;
}
+ my $D = $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
# Count the new files so they can be added newest to oldest
# and still have numbers increasing from oldest to newest
- my $fcount = 0;
- my $pr = $opts->{-progress};
- $pr->("counting changes\n\t$range ... ") if $pr;
- # can't use 'rev-list --count' if we use --diff-filter
- my $fh = $git->popen(qw(log --pretty=tformat:%h
- --no-notes --no-color --no-renames
- --diff-filter=AM), $range);
- ++$fcount while <$fh>;
- close $fh or die "git log failed: \$?=$?";
- my $high = $self->{mm}->num_highwater;
- $pr->("$fcount\n") if $pr; # continue previous line
- $self->{ntodo} = $fcount;
-
- if (index($range, '..') < 0) {
- if ($high && $high == $fcount) {
- # fix up old bugs in full indexes which caused messages to
- # not appear in Msgmap
- $self->{regen_up} = $high;
- } else {
- # normal regen is for for fresh data
- $self->{regen_down} = $fcount;
- $self->{regen_down} += $high unless $opts->{reindex};
+ my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
+ --no-notes --no-color --no-renames --no-abbrev),
+ $range);
+ my ($at, $ct, $stk);
+ while (<$fh>) {
+ if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
+ ($at, $ct) = ($1 + 0, $2 + 0);
+ $stk //= PublicInbox::IdxStack->new($3);
+ } elsif (/$delmsg/) {
+ my $oid = $1;
+ if ($D) { # reindex case
+ $D->{pack('H*', $oid)}++;
+ } else { # non-reindex case:
+ $stk->push_rec('d', $at, $ct, $oid);
+ }
+ } elsif (/$addmsg/) {
+ my $oid = $1;
+ if ($D) {
+ my $oid_bin = pack('H*', $oid);
+ my $nr = --$D->{$oid_bin};
+ delete($D->{$oid_bin}) if $nr <= 0;
+
+ # nr < 0 (-1) means it never existed
+ $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
+ } else {
+ $stk->push_rec('m', $at, $ct, $oid);
+ }
}
- } else {
- # Give oldest messages the smallest numbers
- $self->{regen_down} = $high + $fcount;
}
-
- $git->popen(qw/log --pretty=raw --no-notes --no-color --no-renames
- --raw -r --no-abbrev/, $range);
+ close $fh or die "git log failed: \$?=$?";
+ $stk //= PublicInbox::IdxStack->new;
+ $stk->read_prepare;
}
# --is-ancestor requires git 1.8.0+
sub _index_sync {
my ($self, $opts) = @_;
my $tip = $opts->{ref} || 'HEAD';
- my ($last_commit, $lx, $xlog);
my $git = $self->{ibx}->git;
$git->batch_prepare;
my $pr = $opts->{-progress};
-
+ my $sync = { reindex => $opts->{reindex} };
my $xdb = $self->begin_txn_lazy;
$self->{over}->rethread_prepare($opts);
my $mm = _msgmap_init($self);
- do {
- $xlog = undef; # stop previous git-log via SIGPIPE
- $last_commit = _last_x_commit($self, $mm);
- $lx = reindex_from($opts->{reindex}, $last_commit);
-
- $self->{over}->rollback_lazy;
- $self->{over}->disconnect;
- $git->cleanup;
- delete $self->{txn};
- $xdb->cancel_transaction if $xdb;
- $xdb = _xdb_release($self);
-
- # ensure we leak no FDs to "git log" with Xapian <= 1.2
- my $range = $lx eq '' ? $tip : "$lx..$tip";
- $xlog = _git_log($self, $opts, $range);
-
- $xdb = $self->begin_txn_lazy;
- } while (_last_x_commit($self, $mm) ne $last_commit);
+ if ($sync->{reindex}) {
+ my $last = $mm->last_commit;
+ if ($last) {
+ $tip = $last;
+ } else {
+ # somebody just blindly added --reindex when indexing
+ # for the first time, allow it:
+ undef $sync->{reindex};
+ }
+ }
+ my $last_commit = _last_x_commit($self, $mm);
+ my $lx = reindex_from($sync->{reindex}, $last_commit);
+ my $range = $lx eq '' ? $tip : "$lx..$tip";
+ $pr->("counting changes\n\t$range ... ") if $pr;
+ my $stk = prepare_stack($self, $sync, $range);
+ $sync->{ntodo} = $stk ? $stk->num_records : 0;
+ $pr->("$sync->{ntodo}\n") if $pr; # continue previous line
- my $dbh = $mm->{dbh} if $mm;
+ my $dbh = $mm->{dbh};
my $batch_cb = sub {
- my ($nr, $commit, $newest) = @_;
- if ($dbh) {
- if ($newest) {
- my $cur = $mm->last_commit || '';
- if (need_update($self, $cur, $newest)) {
- $mm->last_commit($newest);
- }
+ my ($nr, $stk) = @_;
+ # latest_cmt may be undef
+ my $newest = $stk ? $stk->{latest_cmt} : undef;
+ if ($newest) {
+ my $cur = $mm->last_commit || '';
+ if (need_update($self, $cur, $newest)) {
+ $mm->last_commit($newest);
}
- $dbh->commit;
}
+ $dbh->commit;
if ($newest && need_xapian($self)) {
my $cur = $xdb->get_metadata('last_commit');
if (need_update($self, $cur, $newest)) {
$self->{over}->rethread_done($opts) if $newest; # all done
$self->commit_txn_lazy;
$git->cleanup;
- $xdb = _xdb_release($self, $nr);
+ $xdb = idx_release($self, $nr);
# let another process do some work...
- $pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
- if (!$newest) { # more to come
+ $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr;
+ if (!$stk) { # more to come
$xdb = $self->begin_txn_lazy;
- $dbh->begin_work if $dbh;
+ $dbh->begin_work;
}
};
$dbh->begin_work;
- read_log($self, $xlog, $batch_cb);
+ process_stack($self, $stk, $sync, $batch_cb);
}
sub DESTROY {
$? == 0 or die ref($self)." pid:$pid exited with: $?";
} else {
die "transaction in progress $self\n" if $self->{txn};
- $self->_xdb_release if $self->{xdb};
+ idx_release($self) if $self->{xdb};
}
}
sub _begin_txn {
my ($self) = @_;
- my $xdb = $self->{xdb} || $self->_xdb_acquire;
+ my $xdb = $self->{xdb} || idx_acquire($self);
$self->{over}->begin_lazy if $self->{over};
$xdb->begin_transaction if $xdb;
$self->{txn} = 1;