]> Sergey Matveev's repositories - public-inbox.git/commitdiff
xcpdb|compact: support --jobs/-j flag like gmake(1)
authorEric Wong <e@80x24.org>
Thu, 23 May 2019 09:37:03 +0000 (09:37 +0000)
committerEric Wong <e@80x24.org>
Thu, 23 May 2019 17:43:51 +0000 (17:43 +0000)
We don't have to be tied to the number of partitions in case
we made a bad choice at initialization.  This doesn't affect
reindexing, but the copying phase is already intensive.

And optimize away the extra process when we only have a single
job which won't parallelize.

The wording for the (v2) reindexing phase could be improved,
later.  I also plan to allow repartitioning of existing
Xapian DBs.

lib/PublicInbox/Xapcmd.pm

index 5b6d06b8eaed32bdf2b1afdd6a18d6abc9cb338a..a294d539d292e564ef7d9733da90fe80430c2343 100644 (file)
@@ -13,7 +13,7 @@ use File::Basename qw(dirname);
 # support testing with dev versions of Xapian which installs
 # commands with a version number suffix (e.g. "xapian-compact-1.5")
 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
-our @COMPACT_OPT = qw(quiet|q blocksize|b=s no-full|n fuller|F);
+our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
 
 sub commit_changes ($$$) {
        my ($ibx, $tmp, $opt) = @_;
@@ -54,8 +54,7 @@ sub cb_spawn {
        my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact()
        defined(my $pid = fork) or die "fork: $!";
        return $pid if $pid > 0;
-       eval { $cb->($args, $opt) };
-       die $@ if $@;
+       $cb->($args, $opt);
        exit 0;
 }
 
@@ -103,6 +102,31 @@ sub same_fs_or_die ($$) {
        die "$x and $y reside on different filesystems\n";
 }
 
+sub process_queue {
+       my ($queue, $cb, $max, $opt) = @_;
+       if ($max <= 1) {
+               while (defined(my $args = shift @$queue)) {
+                       $cb->($args, $opt);
+               }
+               return;
+       }
+
+       # run in parallel:
+       my %pids;
+       while (@$queue) {
+               while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
+                       my $args = shift @$queue;
+                       $pids{cb_spawn($cb, $args, $opt)} = $args;
+               }
+
+               while (scalar keys %pids) {
+                       my $pid = waitpid(-1, 0);
+                       my $args = delete $pids{$pid};
+                       die join(' ', @$args)." failed: $?\n" if $?;
+               }
+       }
+}
+
 sub run {
        my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact'
        my $cb = \&${\"PublicInbox::Xapcmd::$task"};
@@ -163,19 +187,7 @@ sub run {
                }
 
                delete($ibx->{$_}) for (qw(mm over search)); # cleanup
-               my %pids;
-               while (@q) {
-                       while (scalar(keys(%pids)) < $max && scalar(@q)) {
-                               my $args = shift @q;
-                               $pids{cb_spawn($cb, $args, $opt)} = $args;
-                       }
-
-                       while (scalar keys %pids) {
-                               my $pid = waitpid(-1, 0);
-                               my $args = delete $pids{$pid};
-                               die join(' ', @$args)." failed: $?\n" if $?;
-                       }
-               }
+               process_queue(\@q, $cb, $max, $opt);
                commit_changes($ibx, $tmp, $opt);
        });
 }