]> Sergey Matveev's repositories - public-inbox.git/commitdiff
xcpdb: support resharding v2 repos
authorEric Wong <e@80x24.org>
Thu, 13 Jun 2019 00:29:37 +0000 (00:29 +0000)
committerEric Wong <e@80x24.org>
Fri, 14 Jun 2019 01:31:25 +0000 (01:31 +0000)
v2 repos are sometimes created on machines where CPU
parallelization exceeds the capability of the storage devices.

In that case, users may reshard the Xapian DB to any smaller,
positive integer to avoid excessive overhead and contention when
bottlenecked by slow storage.

Resharding can also be used to increase shard count after
hardware upgrades.

Documentation/public-inbox-xcpdb.pod
MANIFEST
lib/PublicInbox/Xapcmd.pm
script/public-inbox-xcpdb
t/xcpdb-reshard.t [new file with mode: 0644]

index a04fd374ce1ba49146d5a14e3932627d7827040d..fd8770a43195c4c05d3b3449acc03305ca8c8c54 100644 (file)
@@ -30,6 +30,17 @@ preferable for gigantic inboxes where the coarse-grained lock
 currently required for L<public-inbox-compact(1)> can cause
 the compaction to take hours at-a-time.
 
+=item --reshard=N / -R N
+
+Repartition the Xapian database on a L<v2|public-inbox-v2-format(5)>
+inbox to C<N> partitions.  Since L<xapian-compact(1)> is not suitable
+for merging, users can rely on this switch to repartition the
+existing Xapian database(s) to any positive value of C<N>.
+
+This is useful in case the Xapian DB was created with too few or
+too many partitions given the capabilities of the current
+hardware.
+
 =item --blocksize / --no-full / --fuller
 
 These options are passed directly to L<xapian-compact(1)> when
index 5085bff818383ee1c1ae7a0dfc794e7db69e8385..a750e09e0661b84ac94cb0e434b6424875a86c3d 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -258,3 +258,4 @@ t/view.t
 t/watch_filter_rubylang.t
 t/watch_maildir.t
 t/watch_maildir_v2.t
+t/xcpdb-reshard.t
index 5d8c35f4e750a5bcdac94a2ee7f703fed7e4776d..e1c6fe3a9c9326a14d1dde5f31d31a811884f843 100644 (file)
@@ -17,34 +17,66 @@ our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
 
 sub commit_changes ($$$) {
        my ($ibx, $tmp, $opt) = @_;
-
+       my $new_parts = $opt->{reshard};
        my $reindex = $opt->{reindex};
        my $im = $ibx->importer(0);
        $im->lock_acquire if !$opt->{-coarse_lock};
 
        $SIG{INT} or die 'BUG: $SIG{INT} not handled';
+       my @old_part;
 
        while (my ($old, $new) = each %$tmp) {
-               my @st = stat($old) or die "failed to stat($old): $!\n";
+               my @st = stat($old);
+               if (!@st && !defined($opt->{reshard})) {
+                       die "failed to stat($old): $!";
+               }
 
                my $over = "$old/over.sqlite3";
                if (-f $over) { # only for v1, v2 over is untouched
+                       defined $new or die "BUG: $over exists when culling v2";
                        $over = PublicInbox::Over->new($over);
                        my $tmp_over = "$new/over.sqlite3";
                        $over->connect->sqlite_backup_to_file($tmp_over);
                        $over = undef;
                }
-               chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
 
+               if (!defined($new)) { # culled partition
+                       push @old_part, $old;
+                       next;
+               }
+
+               if (@st) {
+                       chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
+                       rename($old, "$new/old") or
+                                       die "rename $old => $new/old: $!\n";
+               }
                # Xtmpdir->DESTROY won't remove $new after this:
-               rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
                rename($new, $old) or die "rename $new => $old: $!\n";
-               my $prev = "$old/old";
-               remove_tree($prev) or die "failed to remove $prev: $!\n";
+               if (@st) {
+                       my $prev = "$old/old";
+                       remove_tree($prev) or
+                               die "failed to remove $prev: $!\n";
+               }
        }
+       remove_tree(@old_part);
        $tmp->done;
        if (!$opt->{-coarse_lock}) {
                $opt->{-skip_lock} = 1;
+
+               if ($im->can('count_partitions')) {
+                       my $pr = $opt->{-progress};
+                       my $n = $im->count_partitions;
+                       if (defined $new_parts && $n != $new_parts) {
+                               die
+"BUG: counted $n partitions after repartioning to $new_parts";
+                       }
+                       my $prev = $im->{partitions};
+                       if ($pr && $prev != $n) {
+                               $pr->("partition count changed: $prev => $n\n");
+                               $im->{partitions} = $n;
+                       }
+               }
+
                PublicInbox::Admin::index_inbox($ibx, $opt);
                # implicit lock_release
        } else {
@@ -139,32 +171,59 @@ sub run {
        my $tmp = PublicInbox::Xtmpdirs->new;
        my $v = $ibx->{version} ||= 1;
        my @q;
+       my $new_parts = $opt->{reshard};
+       if (defined $new_parts && $new_parts <= 0) {
+               die "--reshard must be a positive number\n";
+       }
 
        # we want temporary directories to be as deep as possible,
        # so v2 partitions can keep "xap$SCHEMA_VERSION" on a separate FS.
        if ($v == 1) {
+               if (defined $new_parts) {
+                       warn
+"--reshard=$new_parts ignored for v1 $ibx->{mainrepo}\n";
+               }
                my $old_parent = dirname($old);
                same_fs_or_die($old_parent, $old);
                my $v = PublicInbox::Search::SCHEMA_VERSION();
-               $tmp->{$old} = tempdir("xapian$v-XXXXXXXX", DIR => $old_parent);
-               push @q, [ $old, $tmp->{$old} ];
+               my $wip = tempdir("xapian$v-XXXXXXXX", DIR => $old_parent);
+               $tmp->{$old} = $wip;
+               push @q, [ $old, $wip ];
        } else {
                opendir my $dh, $old or die "Failed to opendir $old: $!\n";
+               my @old_parts;
                while (defined(my $dn = readdir($dh))) {
                        if ($dn =~ /\A[0-9]+\z/) {
-                               my $tmpl = "$dn-XXXXXXXX";
-                               my $dst = tempdir($tmpl, DIR => $old);
-                               same_fs_or_die($old, $dst);
-                               my $cur = "$old/$dn";
-                               push @q, [ $cur, $dst ];
-                               $tmp->{$cur} = $dst;
+                               push @old_parts, $dn;
                        } elsif ($dn eq '.' || $dn eq '..') {
                        } elsif ($dn =~ /\Aover\.sqlite3/) {
                        } else {
                                warn "W: skipping unknown dir: $old/$dn\n"
                        }
                }
-               die "No Xapian parts found in $old\n" unless @q;
+               die "No Xapian parts found in $old\n" unless @old_parts;
+
+               my ($src, $max_part);
+               if (!defined($new_parts) || $new_parts == scalar(@old_parts)) {
+                       # 1:1 copy
+                       $max_part = scalar(@old_parts) - 1;
+               } else {
+                       # M:N copy
+                       $max_part = $new_parts - 1;
+                       $src = [ map { "$old/$_" } @old_parts ];
+               }
+               foreach my $dn (0..$max_part) {
+                       my $tmpl = "$dn-XXXXXXXX";
+                       my $wip = tempdir($tmpl, DIR => $old);
+                       same_fs_or_die($old, $wip);
+                       my $cur = "$old/$dn";
+                       push @q, [ $src // $cur , $wip ];
+                       $tmp->{$cur} = $wip;
+               }
+               # mark old parts to be unlinked
+               if ($src) {
+                       $tmp->{$_} ||= undef for @$src;
+               }
        }
        my $im = $ibx->importer(0);
        my $max = $opt->{jobs} || scalar(@q);
@@ -245,12 +304,74 @@ sub compact ($$) {
        }
 }
 
+sub cpdb_loop ($$$;$$) {
+       my ($src, $dst, $pr_data, $cur_part, $new_parts) = @_;
+       my ($pr, $fmt, $nr, $pfx);
+       if ($pr_data) {
+               $pr = $pr_data->{pr};
+               $fmt = $pr_data->{fmt};
+               $nr = \($pr_data->{nr});
+               $pfx = $pr_data->{pfx};
+       }
+
+       my ($it, $end);
+       do {
+               eval {
+                       $it = $src->postlist_begin('');
+                       $end = $src->postlist_end('');
+               };
+       } while (cpdb_retryable($src, $pfx));
+
+       do {
+               eval {
+                       for (; $it != $end; $it++) {
+                               my $docid = $it->get_docid;
+                               if (defined $new_parts) {
+                                       my $dst_part = $docid % $new_parts;
+                                       next if $dst_part != $cur_part;
+                               }
+                               my $doc = $src->get_document($docid);
+                               $dst->replace_document($docid, $doc);
+                               if ($pr_data && !(++$$nr  & 1023)) {
+                                       $pr->(sprintf($fmt, $$nr));
+                               }
+                       }
+
+                       # unlike copydatabase(1), we don't copy spelling
+                       # and synonym data (or other user metadata) since
+                       # the Perl APIs don't expose iterators for them
+                       # (and public-inbox does not use those features)
+               };
+       } while (cpdb_retryable($src, $pfx));
+}
+
 # Like copydatabase(1), this is horribly slow; and it doesn't seem due
 # to the overhead of Perl.
 sub cpdb ($$) {
        my ($args, $opt) = @_;
        my ($old, $new) = @$args;
-       my $src = Search::Xapian::Database->new($old);
+       my ($src, $cur_part);
+       my $new_parts;
+       if (ref($old) eq 'ARRAY') {
+               ($cur_part) = ($new =~ m!xap[0-9]+/([0-9]+)\b!);
+               defined $cur_part or
+                       die "BUG: could not extract partition # from $new";
+               $new_parts = $opt->{reshard};
+               defined $new_parts or die 'BUG: got array src w/o --partition';
+
+               # repartitioning, M:N copy means have full read access
+               foreach (@$old) {
+                       if ($src) {
+                               my $sub = Search::Xapian::Database->new($_);
+                               $src->add_database($sub);
+                       } else {
+                               $src = Search::Xapian::Database->new($_);
+                       }
+               }
+       } else {
+               $src = Search::Xapian::Database->new($old);
+       }
+
        my ($xtmp, $tmp);
        if ($opt->{compact}) {
                my $newdir = dirname($new);
@@ -266,10 +387,9 @@ sub cpdb ($$) {
        # of other bugs:
        my $creat = Search::Xapian::DB_CREATE();
        my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat);
-       my ($it, $end);
-       my ($nr, $tot, $fmt); # progress output
        my $pr = $opt->{-progress};
        my $pfx = $opt->{-progress_pfx} = progress_pfx($new);
+       my $pr_data = { pr => $pr, pfx => $pfx, nr => 0 } if $pr;
 
        do {
                eval {
@@ -284,38 +404,39 @@ sub cpdb ($$) {
                                        $dst->set_metadata('indexlevel', $l);
                                }
                        }
-
-                       $it = $src->postlist_begin('');
-                       $end = $src->postlist_end('');
-                       if ($pr) {
-                               $nr = 0;
-                               $tot = $src->get_doccount;
-                               $fmt = "$pfx % ".length($tot)."u/$tot\n";
-                               $pr->("$pfx copying $tot documents\n");
-                       }
-               };
-       } while (cpdb_retryable($src, $pfx));
-
-       do {
-               eval {
-                       while ($it != $end) {
-                               my $docid = $it->get_docid;
-                               my $doc = $src->get_document($docid);
-                               $dst->replace_document($docid, $doc);
-                               $it->inc;
-                               if ($pr && !(++$nr & 1023)) {
-                                       $pr->(sprintf($fmt, $nr));
+                       if ($pr_data) {
+                               my $tot = $src->get_doccount;
+
+                               # we can only estimate when repartitioning,
+                               # because removed spam causes slight imbalance
+                               my $est = '';
+                               if (defined $cur_part && $new_parts > 1) {
+                                       $tot = int($tot/$new_parts);
+                                       $est = 'around ';
                                }
+                               my $fmt = "$pfx % ".length($tot)."u/$tot\n";
+                               $pr->("$pfx copying $est$tot documents\n");
+                               $pr_data->{fmt} = $fmt;
+                               $pr_data->{total} = $tot;
                        }
-
-                       # unlike copydatabase(1), we don't copy spelling
-                       # and synonym data (or other user metadata) since
-                       # the Perl APIs don't expose iterators for them
-                       # (and public-inbox does not use those features)
                };
        } while (cpdb_retryable($src, $pfx));
 
-       $pr->(sprintf($fmt, $nr)) if $pr;
+       if (defined $new_parts) {
+               # we rely on document IDs matching NNTP article number,
+               # so we can't have the combined DB support rewriting
+               # document IDs.  Thus we iterate through each shard
+               # individually.
+               $src = undef;
+               foreach (@$old) {
+                       my $old = Search::Xapian::Database->new($_);
+                       cpdb_loop($old, $dst, $pr_data, $cur_part, $new_parts);
+               }
+       } else {
+               cpdb_loop($src, $dst, $pr_data);
+       }
+
+       $pr->(sprintf($pr_data->{fmt}, $pr_data->{nr})) if $pr;
        return unless $xtmp;
 
        $src = $dst = undef; # flushes and closes
@@ -360,6 +481,7 @@ sub DESTROY {
        my $owner_pid = delete $owner{"$self"} or return;
        return if $owner_pid != $$;
        foreach my $new (values %$self) {
+               defined $new or next; # may be undef if repartitioning
                remove_tree($new) unless -d "$new/old";
        }
        done($self);
index ef64e58f54675a30091f10befe1e7cd4321bd32a..b41c683bb86267d8c595b8bb6b3cbee8e98c0028 100755 (executable)
@@ -9,8 +9,8 @@ use PublicInbox::Admin;
 PublicInbox::Admin::require_or_die('-search');
 my $usage = "Usage: public-inbox-xcpdb [--compact] INBOX_DIR\n";
 my $opt = {};
-GetOptions($opt, qw(compact), @PublicInbox::Xapcmd::COMPACT_OPT) or
-       die "bad command-line args\n$usage";
+my @opt = (qw(compact reshard|R=i), @PublicInbox::Xapcmd::COMPACT_OPT);
+GetOptions($opt, @opt) or die "bad command-line args\n$usage";
 my @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV) or die $usage;
 foreach (@ibxs) {
        my $ibx = PublicInbox::InboxWritable->new($_);
diff --git a/t/xcpdb-reshard.t b/t/xcpdb-reshard.t
new file mode 100644 (file)
index 0000000..ce552f5
--- /dev/null
@@ -0,0 +1,83 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+my @mods = qw(DBI DBD::SQLite Search::Xapian);
+foreach my $mod (@mods) {
+       eval "require $mod";
+       plan skip_all => "missing $mod for $0" if $@;
+};
+require './t/common.perl';
+require_git('2.6');
+use File::Temp qw/tempdir/;
+use PublicInbox::MIME;
+use PublicInbox::InboxWritable;
+
+my $mime = PublicInbox::MIME->create(
+       header => [
+               From => 'a@example.com',
+               To => 'test@example.com',
+               Subject => 'this is a subject',
+               Date => 'Fri, 02 Oct 1993 00:00:00 +0000',
+       ],
+       body => '',
+);
+
+my ($this) = (split('/', $0))[-1];
+my $tmpdir = tempdir($this.'-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $ibx = PublicInbox::Inbox->new({
+       mainrepo => "$tmpdir/testbox",
+       name => $this,
+       version => 2,
+       -primary_address => 'test@example.com',
+       indexlevel => 'medium',
+});
+my $path = 'blib/script';
+my @xcpdb = ("$path/public-inbox-xcpdb", '-q');
+my $nproc = 8;
+my $ndoc = 13;
+my $im = PublicInbox::InboxWritable->new($ibx, {nproc => $nproc})->importer(1);
+for my $i (1..$ndoc) {
+       $mime->header_set('Message-ID', "<m$i\@example.com>");
+       ok($im->add($mime), "message $i added");
+}
+$im->done;
+my @parts = grep(m!/\d+\z!, glob("$ibx->{mainrepo}/xap*/*"));
+is(scalar(@parts), $nproc, 'got expected parts');
+my $orig = $ibx->over->query_xover(1, $ndoc);
+my %nums = map {; "$_->{num}" => 1 } @$orig;
+
+# ensure we can go up or down in partitions, or stay the same:
+for my $R (qw(2 4 1 3 3)) {
+       delete $ibx->{search}; # release old handles
+       is(system(@xcpdb, "-R$R", $ibx->{mainrepo}), 0, "xcpdb -R$R");
+       my @new_parts = grep(m!/\d+\z!, glob("$ibx->{mainrepo}/xap*/*"));
+       is(scalar(@new_parts), $R, 'repartitioned to two parts');
+       my $msgs = $ibx->search->query('s:this');
+       is(scalar(@$msgs), $ndoc, 'got expected docs after repartitioning');
+       my %by_mid = map {; "$_->{mid}" => $_ } @$msgs;
+       ok($by_mid{"m$_\@example.com"}, "$_ exists") for (1..$ndoc);
+
+       delete $ibx->{search}; # release old handles
+
+       # ensure docids in Xapian match NNTP article numbers
+       my $tot = 0;
+       my %tmp = %nums;
+       foreach my $d (@new_parts) {
+               my $xdb = Search::Xapian::Database->new($d);
+               $tot += $xdb->get_doccount;
+               my $it = $xdb->postlist_begin('');
+               my $end = $xdb->postlist_end('');
+               for (; $it != $end; $it++) {
+                       my $docid = $it->get_docid;
+                       if ($xdb->get_document($docid)) {
+                               ok(delete($tmp{$docid}), "saw #$docid");
+                       }
+               }
+       }
+       is(scalar keys %tmp, 0, 'all docids seen');
+}
+
+done_testing();
+1;