]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
index: --sequential-shard works incrementally
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index 6b1effe58e133cebf3a74f107698b40952818fad..8148439aca5e6dd3dc015f33849e4d890460eea3 100644 (file)
@@ -35,14 +35,13 @@ my $PACKING_FACTOR = 0.4;
 our $NPROC_MAX_DEFAULT = 4;
 
 sub detect_nproc () {
-       for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
-               `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
-       }
-
        # getconf(1) is POSIX, but *NPROCESSORS* vars are not
        for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) {
                `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1;
        }
+       for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
+               `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
+       }
 
        # should we bother with `sysctl hw.ncpu`?  Those only give
        # us total processor count, not online processor count.
@@ -122,7 +121,7 @@ sub new {
                rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
                last_commit => [], # git epoch -> commit
        };
-       $self->{over}->{-no_sync} = 1 if $v2ibx->{-no_sync};
+       $self->{over}->{-no_fsync} = 1 if $v2ibx->{-no_fsync};
        $self->{shards} = count_shards($self) || nproc_shards($creat);
        bless $self, $class;
 }
@@ -152,6 +151,12 @@ sub add {
        $self->{ibx}->with_umask(\&_add, $self, $eml, $check_cb);
 }
 
+sub batch_bytes ($) {
+       my ($self) = @_;
+       ($self->{parallel} ? $self->{shards} : 1) *
+               $PublicInbox::SearchIdx::BATCH_BYTES;
+}
+
 # indexes a message, returns true if checkpointing is needed
 sub do_idx ($$$$) {
        my ($self, $msgref, $mime, $smsg) = @_;
@@ -160,7 +165,7 @@ sub do_idx ($$$$) {
        my $idx = idx_shard($self, $smsg->{num} % $self->{shards});
        $idx->index_raw($msgref, $mime, $smsg);
        my $n = $self->{transact_bytes} += $smsg->{raw_bytes};
-       $n >= ($PublicInbox::SearchIdx::BATCH_BYTES * $self->{shards});
+       $n >= batch_bytes($self);
 }
 
 sub _add {
@@ -292,7 +297,7 @@ sub _idx_init { # with_umask callback
        # for SQLite:
        my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
                                "$self->{ibx}->{inboxdir}/msgmap.sqlite3",
-                               $self->{ibx}->{-no_sync} ? 2 : 1);
+                               $self->{ibx}->{-no_fsync} ? 2 : 1);
        $mm->{dbh}->begin_work;
 }
 
@@ -1192,20 +1197,20 @@ sub index_xap_only { # git->cat_async callback
 
 sub index_xap_step ($$$;$) {
        my ($self, $sync, $beg, $step) = @_;
-       my $ibx = $self->{ibx};
-       my $all = $ibx->git;
-       my $over = $ibx->over;
-       my $batch_bytes = $PublicInbox::SearchIdx::BATCH_BYTES;
-       $step //= $self->{shards};
        my $end = $sync->{art_end};
+       return if $beg > $end; # nothing to do
+
+       $step //= $self->{shards};
+       my $ibx = $self->{ibx};
        if (my $pr = $sync->{-opt}->{-progress}) {
                $pr->("Xapian indexlevel=$ibx->{indexlevel} ".
                        "$beg..$end (% $step)\n");
        }
+       my $batch_bytes = batch_bytes($self);
        for (my $num = $beg; $num <= $end; $num += $step) {
-               my $smsg = $over->get_art($num) or next;
+               my $smsg = $ibx->over->get_art($num) or next;
                $smsg->{v2w} = $self;
-               $all->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
+               $ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
                if ($self->{transact_bytes} >= $batch_bytes) {
                        ${$sync->{nr}} = $num;
                        reindex_checkpoint($self, $sync);
@@ -1247,8 +1252,9 @@ sub index_epoch ($$$) {
 }
 
 sub xapian_only {
-       my ($self, $opt, $sync) = @_;
+       my ($self, $opt, $sync, $art_beg) = @_;
        my $seq = $opt->{sequentialshard};
+       $art_beg //= 0;
        local $self->{parallel} = 0 if $seq;
        $self->idx_init($opt); # acquire lock
        if (my $art_end = $self->{ibx}->mm->max) {
@@ -1262,9 +1268,11 @@ sub xapian_only {
                $sync->{art_end} = $art_end;
                if ($seq || !$self->{parallel}) {
                        my $shard_end = $self->{shards} - 1;
-                       index_xap_step($self, $sync, $_) for (0..$shard_end);
+                       for (0..$shard_end) {
+                               index_xap_step($self, $sync, $art_beg + $_)
+                       }
                } else { # parallel (maybe)
-                       index_xap_step($self, $sync, 0, 1);
+                       index_xap_step($self, $sync, $art_beg, 1);
                }
        }
        $self->{ibx}->git->cat_async_wait;
@@ -1283,6 +1291,7 @@ sub index_sync {
        return unless defined $latest;
 
        my $seq = $opt->{sequentialshard};
+       my $art_beg; # the NNTP article number we start xapian_only at
        my $idxlevel = $self->{ibx}->{indexlevel};
        local $self->{ibx}->{indexlevel} = 'basic' if $seq;
 
@@ -1306,6 +1315,12 @@ sub index_sync {
                $self->{mm}->{dbh}->begin_work;
                $sync->{mm_tmp} =
                        $self->{mm}->tmp_clone($self->{ibx}->{inboxdir});
+
+               # xapian_only works incrementally w/o --reindex
+               if ($seq && !$opt->{reindex}) {
+                       $art_beg = $sync->{mm_tmp}->max;
+                       $art_beg++ if defined($art_beg);
+               }
        }
        if ($sync->{index_max_size} = $self->{ibx}->{index_max_size}) {
                $sync->{index_oid} = \&index_oid;
@@ -1320,10 +1335,10 @@ sub index_sync {
                $pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr;
        }
 
-       if ($seq) { # deal with Xapian shards sequentially
+       # deal with Xapian shards sequentially
+       if ($seq && delete($sync->{mm_tmp})) {
                $self->{ibx}->{indexlevel} = $idxlevel;
-               delete $sync->{mm_tmp};
-               xapian_only($self, $opt, $sync);
+               xapian_only($self, $opt, $sync, $art_beg);
        }
 
        # reindex does not pick up new changes, so we rerun w/o it: