# support testing with dev versions of Xapian which installs
# commands with a version number suffix (e.g. "xapian-compact-1.5")
our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
-our @COMPACT_OPT = qw(quiet|q blocksize|b=s no-full|n fuller|F);
+our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
sub commit_changes ($$$) {
my ($ibx, $tmp, $opt) = @_;
my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact()
defined(my $pid = fork) or die "fork: $!";
return $pid if $pid > 0;
- eval { $cb->($args, $opt) };
- die $@ if $@;
+ $cb->($args, $opt);
exit 0;
}
die "$x and $y reside on different filesystems\n";
}
+sub process_queue {
+ my ($queue, $cb, $max, $opt) = @_;
+ if ($max <= 1) {
+ while (defined(my $args = shift @$queue)) {
+ $cb->($args, $opt);
+ }
+ return;
+ }
+
+ # run in parallel:
+ my %pids;
+ while (@$queue) {
+ while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
+ my $args = shift @$queue;
+ $pids{cb_spawn($cb, $args, $opt)} = $args;
+ }
+
+ while (scalar keys %pids) {
+ my $pid = waitpid(-1, 0);
+ my $args = delete $pids{$pid};
+ die join(' ', @$args)." failed: $?\n" if $?;
+ }
+ }
+}
+
sub run {
my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact'
my $cb = \&${\"PublicInbox::Xapcmd::$task"};
}
delete($ibx->{$_}) for (qw(mm over search)); # cleanup
- my %pids;
- while (@q) {
- while (scalar(keys(%pids)) < $max && scalar(@q)) {
- my $args = shift @q;
- $pids{cb_spawn($cb, $args, $opt)} = $args;
- }
-
- while (scalar keys %pids) {
- my $pid = waitpid(-1, 0);
- my $args = delete $pids{$pid};
- die join(' ', @$args)." failed: $?\n" if $?;
- }
- }
+ process_queue(\@q, $cb, $max, $opt);
commit_changes($ibx, $tmp, $opt);
});
}