+
+ # fine-grained locking if we prepare for reindex
+ if (!$opt->{-coarse_lock}) {
+ prepare_reindex($ibx, $im, $reindex);
+ $im->lock_release;
+ }
+
+ $ibx->cleanup;
+ process_queue(\@q, $cb, $max, $opt);
+ $im->lock_acquire if !$opt->{-coarse_lock};
+ commit_changes($ibx, $im, $tmp, $opt);
+ });
+}
+
+sub cpdb_retryable ($$) {
+ my ($src, $pfx) = @_;
+ if (ref($@) =~ /\bDatabaseModifiedError\b/) {
+ warn "$pfx Xapian DB modified, reopening and retrying\n";
+ $src->reopen;
+ return 1;
+ }
+ if ($@) {
+ warn "$pfx E: ", ref($@), "\n";
+ die;
+ }
+ 0;
+}
+
+sub progress_pfx ($) {
+ my ($wip) = @_; # tempdir v2: ([0-9])+-XXXXXXXX
+ my @p = split('/', $wip);
+
+ # return "xap15/0" for v2, or "xapian15" for v1:
+ ($p[-1] =~ /\A([0-9]+)/) ? "$p[-2]/$1" : $p[-1];
+}
+
+# xapian-compact wrapper
+sub compact ($$) {
+ my ($args, $opt) = @_;
+ my ($src, $newdir) = @$args;
+ my $dst = ref($newdir) ? $newdir->dirname : $newdir;
+ my ($r, $w);
+ my $pfx = $opt->{-progress_pfx} ||= progress_pfx($src);
+ my $pr = $opt->{-progress};
+ my $rdr = {};
+
+ foreach my $fd (0..2) {
+ defined(my $dfd = $opt->{$fd}) or next;
+ $rdr->{$fd} = $dfd;
+ }
+ $rdr->{1} = $w if $pr && pipe($r, $w);
+
+ # we rely on --no-renumber to keep docids synched to NNTP
+ my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ];
+ for my $sw (qw(no-full fuller)) {
+ push @$cmd, "--$sw" if $opt->{$sw};
+ }
+ for my $sw (qw(blocksize)) {
+ defined(my $v = $opt->{$sw}) or next;
+ push @$cmd, "--$sw", $v;
+ }
+ $pr->("$pfx `".join(' ', @$cmd)."'\n") if $pr;
+ push @$cmd, $src, $dst;
+ my $pid = spawn($cmd, undef, $rdr);
+ if ($pr) {
+ close $w or die "close: \$w: $!";
+ foreach (<$r>) {
+ s/\r/\r$pfx /g;
+ $pr->("$pfx $_");
+ }
+ }
+ my $rp = waitpid($pid, 0);
+ if ($? || $rp != $pid) {
+ die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n";
+ }
+}
+
+sub cpdb_loop ($$$;$$) {
+ my ($src, $dst, $pr_data, $cur_shard, $reshard) = @_;
+ my ($pr, $fmt, $nr, $pfx);
+ if ($pr_data) {
+ $pr = $pr_data->{pr};
+ $fmt = $pr_data->{fmt};
+ $nr = \($pr_data->{nr});
+ $pfx = $pr_data->{pfx};
+ }
+
+ my ($it, $end);
+ do {
+ eval {
+ $it = $src->postlist_begin('');
+ $end = $src->postlist_end('');
+ };
+ } while (cpdb_retryable($src, $pfx));
+
+ do {
+ eval {
+ for (; $it != $end; $it++) {
+ my $docid = $it->get_docid;
+ if (defined $reshard) {
+ my $dst_shard = $docid % $reshard;
+ next if $dst_shard != $cur_shard;
+ }
+ my $doc = $src->get_document($docid);
+ $dst->replace_document($docid, $doc);
+ if ($pr_data && !(++$$nr & 1023)) {
+ $pr->(sprintf($fmt, $$nr));
+ }