The key change in v2 is the inbox is no longer a bare git
repository, but a directory with two or more git repositories.
v2 divides git repositories by time "epochs" and Xapian
-databases for parallelism by "partitions".
+databases for parallelism by "shards".
=head2 INBOX OVERVIEW AND DEFINITIONS
- inbox.lock # lock file (flock) to protect global state
- git/$EPOCH.git # normal git repositories
- all.git # empty git repo, alternates to git/$EPOCH.git
-- xap$SCHEMA_VERSION/$PART # per-partition Xapian DB
+- xap$SCHEMA_VERSION/$SHARD # per-shard Xapian DB
- xap$SCHEMA_VERSION/over.sqlite3 # OVER-view DB for NNTP and threading
- msgmap.sqlite3 # same the v1 msgmap
L<https://public-inbox.org/meta/20180209205140.GA11047@dcvr/>
-=head2 XAPIAN PARTITIONS
+=head2 XAPIAN SHARDS
Another second scalability problem in v1 was the inability to
utilize multiple CPU cores for Xapian indexing. This is
-addressed by using partitions in Xapian to perform import
+addressed by using shards in Xapian to perform import
indexing in parallel.
As with git alternates, Xapian natively supports a read-only
interface which transparently abstracts away the knowledge of
-multiple partitions. This allows us to simplify our read-only
+multiple shards. This allows us to simplify our read-only
code paths.
The performance of the storage device is now the bottleneck on
=item --compact
In addition to performing the copy operation, run L<xapian-compact(1)>
-on each Xapian partition after copying but before finalizing it.
+on each Xapian shard after copying but before finalizing it.
Compared to the cost of copying a Xapian database, compacting a
Xapian database takes only around 5% of the time required to copy.
=item --reshard=N / -R N
-Repartition the Xapian database on a L<v2|public-inbox-v2-format(5)>
-inbox to C<N> partitions. Since L<xapian-compact(1)> is not suitable
-for merging, users can rely on this switch to repartition the
+Reshard the Xapian database on a L<v2|public-inbox-v2-format(5)>
+inbox to C<N> shards . Since L<xapian-compact(1)> is not suitable
+for merging, users can rely on this switch to reshard the
existing Xapian database(s) to any positive value of C<N>.
This is useful in case the Xapian DB was created with too few or
-too many partitions given the capabilities of the current
-hardware.
+too many shards given the capabilities of the current hardware.
=item --blocksize / --no-full / --fuller
lib/PublicInbox/SaPlugin/ListMirror.pm
lib/PublicInbox/Search.pm
lib/PublicInbox/SearchIdx.pm
-lib/PublicInbox/SearchIdxPart.pm
+lib/PublicInbox/SearchIdxShard.pm
lib/PublicInbox/SearchMsg.pm
lib/PublicInbox/SearchThread.pm
lib/PublicInbox/SearchView.pm
if ($jobs == 0) {
$v2w->{parallel} = 0;
} else {
- my $n = $v2w->{partitions};
+ my $n = $v2w->{shards};
if ($jobs != ($n + 1)) {
warn
-"Unable to respect --jobs=$jobs, inbox was created with $n partitions\n";
+"Unable to respect --jobs=$jobs, inbox was created with $n shards\n";
}
}
}
# $ibx->{search} is populated by $ibx->over call
my $xdir_ro = $ibx->{search}->xdir(1);
- my $npart = 0;
- foreach my $part (<$xdir_ro/*>) {
- if (-d $part && $part =~ m!/[0-9]+\z!) {
+ my $nshard = 0;
+ foreach my $shard (<$xdir_ro/*>) {
+ if (-d $shard && $shard =~ m!/[0-9]+\z!) {
my $bytes = 0;
- $bytes += -s $_ foreach glob("$part/*");
- $npart++ if $bytes;
+ $bytes += -s $_ foreach glob("$shard/*");
+ $nshard++ if $bytes;
}
}
- if ($npart) {
+ if ($nshard) {
PublicInbox::Admin::require_or_die('-search');
} else {
# somebody could "rm -r" all the Xapian directories;
bless $opts, $class;
}
-sub git_part {
- my ($self, $part) = @_;
+sub git_epoch {
+ my ($self, $epoch) = @_;
($self->{version} || 1) == 2 or return;
- $self->{"$part.git"} ||= eval {
- my $git_dir = "$self->{mainrepo}/git/$part.git";
+ $self->{"$epoch.git"} ||= eval {
+ my $git_dir = "$self->{mainrepo}/git/$epoch.git";
my $g = PublicInbox::Git->new($git_dir);
$g->{-httpbackend_limiter} = $self->{-httpbackend_limiter};
# no cleanup needed, we never cat-file off this, only clone
};
}
-sub max_git_part {
+sub max_git_epoch {
my ($self) = @_;
my $v = $self->{version};
return unless defined($v) && $v == 2;
- my $part = $self->{-max_git_part};
+ my $cur = $self->{-max_git_epoch};
my $changed = git($self)->alternates_changed;
- if (!defined($part) || $changed) {
+ if (!defined($cur) || $changed) {
$self->git->cleanup if $changed;
my $gits = "$self->{mainrepo}/git";
if (opendir my $dh, $gits) {
$git_dir =~ m!\A([0-9]+)\.git\z! or next;
$max = $1 if $1 > $max;
}
- $part = $self->{-max_git_part} = $max if $max >= 0;
+ $cur = $self->{-max_git_epoch} = $max if $max >= 0;
} else {
warn "opendir $gits failed: $!\n";
}
}
- $part;
+ $cur;
}
sub mm {
}
sub init_inbox {
- my ($self, $partitions, $skip_epoch, $skip_artnum) = @_;
+ my ($self, $shards, $skip_epoch, $skip_artnum) = @_;
# TODO: honor skip_artnum
my $v = $self->{version} || 1;
if ($v == 1) {
PublicInbox::Import::init_bare($dir);
} else {
my $v2w = importer($self);
- $v2w->init_inbox($partitions, $skip_epoch, $skip_artnum);
+ $v2w->init_inbox($shards, $skip_epoch, $skip_artnum);
}
}
my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
return $dir if $rdonly;
- my $part = $self->{partition};
- defined $part or die "partition not given";
- $dir .= "/$part";
+ my $shard = $self->{shard};
+ defined $shard or die "shard not given";
+ $dir .= "/$shard";
}
}
my ($xdb, $slow_phrase);
my $qpf = \($self->{qp_flags} ||= $QP_FLAGS);
if ($self->{version} >= 2) {
- foreach my $part (<$dir/*>) {
- -d $part && $part =~ m!/[0-9]+\z! or next;
- my $sub = Search::Xapian::Database->new($part);
+ foreach my $shard (<$dir/*>) {
+ -d $shard && $shard =~ m!/[0-9]+\z! or next;
+ my $sub = Search::Xapian::Database->new($shard);
if ($xdb) {
$xdb->add_database($sub);
} else {
$xdb = $sub;
}
- $slow_phrase ||= -f "$part/iamchert";
+ $slow_phrase ||= -f "$shard/iamchert";
}
} else {
$slow_phrase = -f "$dir/iamchert";
my $xapianlevels = qr/\A(?:full|medium)\z/;
sub new {
- my ($class, $ibx, $creat, $part) = @_;
+ my ($class, $ibx, $creat, $shard) = @_;
ref $ibx or die "BUG: expected PublicInbox::Inbox object: $ibx";
my $levels = qr/\A(?:full|medium|basic)\z/;
my $mainrepo = $ibx->{mainrepo};
my $dir = $self->xdir;
$self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3");
} elsif ($version == 2) {
- defined $part or die "partition is required for v2\n";
- # partition is a number
- $self->{partition} = $part;
+ defined $shard or die "shard is required for v2\n";
+ # shard is a number
+ $self->{shard} = $shard;
$self->{lock_path} = undef;
} else {
die "unsupported inbox version=$version\n";
$self->lock_acquire;
# don't create empty Xapian directories if we don't need Xapian
- my $is_part = defined($self->{partition});
- if (!$is_part || ($is_part && need_xapian($self))) {
+ my $is_shard = defined($self->{shard});
+ if (!$is_shard || ($is_shard && need_xapian($self))) {
File::Path::mkpath($dir);
}
}
sub remote_remove {
my ($self, $oid, $mid) = @_;
if (my $w = $self->{w}) {
- # triggers remove_by_oid in a partition
+ # triggers remove_by_oid in a shard
print $w "D $oid $mid\n" or die "failed to write remove $!";
} else {
$self->begin_txn_lazy;
$self->{-inbox}->with_umask(sub {
if (my $xdb = $self->{xdb}) {
- # store 'indexlevel=medium' in v2 part=0 and v1 (only part)
+ # store 'indexlevel=medium' in v2 shard=0 and
+ # v1 (only one shard)
# This metadata is read by Admin::detect_indexlevel:
- if (!$self->{partition} # undef or 0, not >0
+ if (!$self->{shard} # undef or 0, not >0
&& $self->{indexlevel} eq 'medium') {
$xdb->set_metadata('indexlevel', 'medium');
}
# Copyright (C) 2018 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# used to interface with a single Xapian partition in V2 repos.
-# See L<public-inbox-v2-format(5)> for more info on how we partition Xapian
-package PublicInbox::SearchIdxPart;
+# used to interface with a single Xapian shard in V2 repos.
+# See L<public-inbox-v2-format(5)> for more info on how we shard Xapian
+package PublicInbox::SearchIdxShard;
use strict;
use warnings;
use base qw(PublicInbox::SearchIdx);
sub new {
- my ($class, $v2writable, $part) = @_;
- my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part);
+ my ($class, $v2writable, $shard) = @_;
+ my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $shard);
# create the DB before forking:
$self->_xdb_acquire;
$self->_xdb_release;
- $self->spawn_worker($v2writable, $part) if $v2writable->{parallel};
+ $self->spawn_worker($v2writable, $shard) if $v2writable->{parallel};
$self;
}
sub spawn_worker {
- my ($self, $v2writable, $part) = @_;
+ my ($self, $v2writable, $shard) = @_;
my ($r, $w);
pipe($r, $w) or die "pipe failed: $!\n";
binmode $r, ':raw';
# speeds V2Writable batch imports across 8 cores by nearly 20%
fcntl($r, 1031, 1048576) if $^O eq 'linux';
- eval { partition_worker_loop($self, $r, $part, $bnote) };
- die "worker $part died: $@\n" if $@;
+ eval { shard_worker_loop($self, $r, $shard, $bnote) };
+ die "worker $shard died: $@\n" if $@;
die "unexpected MM $self->{mm}" if $self->{mm};
exit;
}
close $r or die "failed to close: $!";
}
-sub partition_worker_loop ($$$$) {
- my ($self, $r, $part, $bnote) = @_;
- $0 = "pi-v2-partition[$part]";
+sub shard_worker_loop ($$$$) {
+ my ($self, $r, $shard, $bnote) = @_;
+ $0 = "pi-v2-shard[$shard]";
my $current_info = '';
my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
local $SIG{__WARN__} = sub {
chomp $current_info;
- $warn_cb->("[$part] $current_info: ", @_);
+ $warn_cb->("[$shard] $current_info: ", @_);
};
$self->begin_txn_lazy;
while (my $line = $r->getline) {
} elsif ($line eq "barrier\n") {
$self->commit_txn_lazy;
# no need to lock < 512 bytes is atomic under POSIX
- print $bnote "barrier $part\n" or
+ print $bnote "barrier $shard\n" or
die "write failed for barrier $!\n";
} elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
my ($oid, $mid) = ($1, $2);
my ($self, $bytes, $msgref, $artnum, $oid, $mid0, $mime) = @_;
if (my $w = $self->{w}) {
print $w "$bytes $artnum $oid $mid0\n", $$msgref or die
- "failed to write partition $!\n";
+ "failed to write shard $!\n";
$w->flush or die "failed to flush: $!\n";
} else {
$$msgref = undef;
use strict;
use warnings;
use base qw(PublicInbox::Lock);
-use PublicInbox::SearchIdxPart;
+use PublicInbox::SearchIdxShard;
use PublicInbox::MIME;
use PublicInbox::Git;
use PublicInbox::Import;
my $PACKING_FACTOR = 0.4;
# SATA storage lags behind what CPUs are capable of, so relying on
-# nproc(1) can be misleading and having extra Xapian partions is a
+# nproc(1) can be misleading and having extra Xapian shards is a
# waste of FDs and space. It can also lead to excessive IO latency
# and slow things down. Users on NVME or other fast storage can
# use the NPROC env or switches in our script/public-inbox-* programs
-# to increase Xapian partitions.
+# to increase Xapian shards
our $NPROC_MAX_DEFAULT = 4;
-sub nproc_parts ($) {
+sub nproc_shards ($) {
my ($creat_opt) = @_;
if (ref($creat_opt) eq 'HASH') {
if (defined(my $n = $creat_opt->{nproc})) {
$n < 1 ? 1 : $n;
}
-sub count_partitions ($) {
+sub count_shards ($) {
my ($self) = @_;
- my $nparts = 0;
+ my $n = 0;
my $xpfx = $self->{xpfx};
- # always load existing partitions in case core count changes:
- # Also, partition count may change while -watch is running
- # due to -compact
+ # always load existing shards in case core count changes:
+ # Also, shard count may change while -watch is running
+ # due to "xcpdb --reshard"
if (-d $xpfx) {
- foreach my $part (<$xpfx/*>) {
- -d $part && $part =~ m!/[0-9]+\z! or next;
+ foreach my $shard (<$xpfx/*>) {
+ -d $shard && $shard =~ m!/[0-9]+\z! or next;
eval {
- Search::Xapian::Database->new($part)->close;
- $nparts++;
+ Search::Xapian::Database->new($shard)->close;
+ $n++;
};
}
}
- $nparts;
+ $n;
}
sub new {
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
last_commit => [], # git repo -> commit
};
- $self->{partitions} = count_partitions($self) || nproc_parts($creat);
+ $self->{shards} = count_shards($self) || nproc_shards($creat);
bless $self, $class;
}
sub do_idx ($$$$$$$) {
my ($self, $msgref, $mime, $len, $num, $oid, $mid0) = @_;
$self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
- my $npart = $self->{partitions};
- my $part = $num % $npart;
- my $idx = idx_part($self, $part);
+ my $idx = idx_shard($self, $num % $self->{shards});
$idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
my $n = $self->{transact_bytes} += $len;
- $n >= (PublicInbox::SearchIdx::BATCH_BYTES * $npart);
+ $n >= (PublicInbox::SearchIdx::BATCH_BYTES * $self->{shards});
}
sub _add {
$num;
}
-sub idx_part {
- my ($self, $part) = @_;
- $self->{idx_parts}->[$part];
+sub idx_shard {
+ my ($self, $shard_i) = @_;
+ $self->{idx_shards}->[$shard_i];
}
# idempotent
sub idx_init {
my ($self, $opt) = @_;
- return if $self->{idx_parts};
+ return if $self->{idx_shards};
my $ibx = $self->{-inbox};
# do not leak read-only FDs to child processes, we only have these
$self->lock_acquire unless ($opt && $opt->{-skip_lock});
$over->create;
- # -compact can change partition count while -watch is idle
- my $nparts = count_partitions($self);
- if ($nparts && $nparts != $self->{partitions}) {
- $self->{partitions} = $nparts;
+ # xcpdb can change shard count while -watch is idle
+ my $nshards = count_shards($self);
+ if ($nshards && $nshards != $self->{shards}) {
+ $self->{shards} = $nshards;
}
- # need to create all parts before initializing msgmap FD
- my $max = $self->{partitions} - 1;
+ # need to create all shards before initializing msgmap FD
+ my $max = $self->{shards} - 1;
- # idx_parts must be visible to all forked processes
- my $idx = $self->{idx_parts} = [];
+ # idx_shards must be visible to all forked processes
+ my $idx = $self->{idx_shards} = [];
for my $i (0..$max) {
- push @$idx, PublicInbox::SearchIdxPart->new($self, $i);
+ push @$idx, PublicInbox::SearchIdxShard->new($self, $i);
}
# Now that all subprocesses are up, we can open the FDs
}
my $over = $self->{over};
my $cids = content_ids($old_mime);
- my $parts = $self->{idx_parts};
my $removed;
my $mids = mids($old_mime->header_obj);
$rewritten->{rewrites};
}
-sub last_commit_part ($$;$) {
+sub last_epoch_commit ($$;$) {
my ($self, $i, $cmt) = @_;
my $v = PublicInbox::Search::SCHEMA_VERSION();
$self->{mm}->last_commit_xap($v, $i, $cmt);
foreach my $i (0..$epoch_max) {
defined(my $cmt = $last_commit->[$i]) or next;
$last_commit->[$i] = undef;
- last_commit_part($self, $i, $cmt);
+ last_epoch_commit($self, $i, $cmt);
}
}
while (scalar keys %$barrier) {
defined(my $l = $r->getline) or die "EOF on barrier_wait: $!";
$l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l";
- delete $barrier->{$1} or die "bad part[$1] on barrier wait";
+ delete $barrier->{$1} or die "bad shard[$1] on barrier wait";
}
}
$im->checkpoint;
}
}
- my $parts = $self->{idx_parts};
- if ($parts) {
+ my $shards = $self->{idx_shards};
+ if ($shards) {
my $dbh = $self->{mm}->{dbh};
# SQLite msgmap data is second in importance
# Now deal with Xapian
if ($wait) {
- my $barrier = $self->barrier_init(scalar @$parts);
+ my $barrier = $self->barrier_init(scalar @$shards);
- # each partition needs to issue a barrier command
- $_->remote_barrier for @$parts;
+ # each shard needs to issue a barrier command
+ $_->remote_barrier for @$shards;
- # wait for each Xapian partition
+ # wait for each Xapian shard
$self->barrier_wait($barrier);
} else {
- $_->remote_commit for @$parts;
+ $_->remote_commit for @$shards;
}
# last_commit is special, don't commit these until
- # remote partitions are done:
+ # remote shards are done:
$dbh->begin_work;
set_last_commits($self);
$dbh->commit;
checkpoint($self);
my $mm = delete $self->{mm};
$mm->{dbh}->commit if $mm;
- my $parts = delete $self->{idx_parts};
- if ($parts) {
- $_->remote_close for @$parts;
+ my $shards = delete $self->{idx_shards};
+ if ($shards) {
+ $_->remote_close for @$shards;
}
$self->{over}->disconnect;
delete $self->{bnote};
$self->{transact_bytes} = 0;
- $self->lock_release if $parts;
+ $self->lock_release if $shards;
$self->{-inbox}->git->cleanup;
}
my ($self) = @_;
my $fh = delete $self->{reindex_pipe};
close $fh if $fh;
- if (my $parts = $self->{idx_parts}) {
- $_->atfork_child foreach @$parts;
+ if (my $shards = $self->{idx_shards}) {
+ $_->atfork_child foreach @$shards;
}
if (my $im = $self->{im}) {
$im->atfork_child;
# only update last_commit for $i on reindex iff newer than current
sub update_last_commit ($$$$) {
my ($self, $git, $i, $cmt) = @_;
- my $last = last_commit_part($self, $i);
+ my $last = last_epoch_commit($self, $i);
if (defined $last && is_ancestor($git, $last, $cmt)) {
my @cmd = (qw(rev-list --count), "$last..$cmt");
chomp(my $n = $git->qx(@cmd));
return if $n ne '' && $n == 0;
}
- last_commit_part($self, $i, $cmt);
+ last_epoch_commit($self, $i, $cmt);
}
sub git_dir_n ($$) { "$_[0]->{-inbox}->{mainrepo}/git/$_[1].git" }
my ($self, $epoch_max) = @_;
my $heads = [];
for (my $i = $epoch_max; $i >= 0; $i--) {
- $heads->[$i] = last_commit_part($self, $i);
+ $heads->[$i] = last_epoch_commit($self, $i);
}
$heads;
}
for (my $i = $epoch_max; $i >= 0; $i--) {
die 'BUG: already indexing!' if $self->{reindex_pipe};
my $git_dir = git_dir_n($self, $i);
- -d $git_dir or next; # missing parts are fine
+ -d $git_dir or next; # missing epochs are fine
my $git = PublicInbox::Git->new($git_dir);
if ($reindex_heads) {
$head = $reindex_heads->[$i] or next;
sub unindex_oid_remote ($$$) {
my ($self, $oid, $mid) = @_;
- $_->remote_remove($oid, $mid) foreach @{$self->{idx_parts}};
+ $_->remote_remove($oid, $mid) foreach @{$self->{idx_shards}};
$self->{over}->remove_oid($oid, $mid);
}
my $git_dir = git_dir_n($self, $i);
die 'BUG: already reindexing!' if $self->{reindex_pipe};
- -d $git_dir or return; # missing parts are fine
+ -d $git_dir or return; # missing epochs are fine
fill_alternates($self, $i);
my $git = PublicInbox::Git->new($git_dir);
if (my $unindex_range = delete $sync->{unindex_range}->{$i}) {
if ($method eq 'POST') {
if ($path_info =~ m!$INBOX_RE/(?:(?:git/)?([0-9]+)(?:\.git)?/)?
(git-upload-pack)\z!x) {
- my ($part, $path) = ($2, $3);
+ my ($epoch, $path) = ($2, $3);
return invalid_inbox($ctx, $1) ||
- serve_git($ctx, $part, $path);
+ serve_git($ctx, $epoch, $path);
} elsif ($path_info =~ m!$INBOX_RE/!o) {
return invalid_inbox($ctx, $1) || mbox_results($ctx);
}
invalid_inbox($ctx, $1) || get_new($ctx);
} elsif ($path_info =~ m!$INBOX_RE/(?:(?:git/)?([0-9]+)(?:\.git)?/)?
($PublicInbox::GitHTTPBackend::ANY)\z!ox) {
- my ($part, $path) = ($2, $3);
- invalid_inbox($ctx, $1) || serve_git($ctx, $part, $path);
+ my ($epoch, $path) = ($2, $3);
+ invalid_inbox($ctx, $1) || serve_git($ctx, $epoch, $path);
} elsif ($path_info =~ m!$INBOX_RE/([a-zA-Z0-9_\-]+).mbox\.gz\z!o) {
serve_mbox_range($ctx, $1, $2);
} elsif ($path_info =~ m!$INBOX_RE/$MID_RE/$END_RE\z!o) {
}
sub serve_git {
- my ($ctx, $part, $path) = @_;
+ my ($ctx, $epoch, $path) = @_;
my $env = $ctx->{env};
my $ibx = $ctx->{-inbox};
- my $git = defined $part ? $ibx->git_part($part) : $ibx->git;
+ my $git = defined $epoch ? $ibx->git_epoch($epoch) : $ibx->git;
$git ? PublicInbox::GitHTTPBackend::serve($env, $git, $path) : r404();
}
my $manifest = { -abs2urlpath => {}, -mtime => 0 };
for my $ibx (@$list) {
- if (defined(my $max = $ibx->max_git_part)) {
+ if (defined(my $max = $ibx->max_git_epoch)) {
for my $epoch (0..$max) {
manifest_add($manifest, $ibx, $epoch);
}
my (%seen, @urls);
my $http = $ibx->base_url($ctx->{env});
chop $http; # no trailing slash for clone
- my $part = $ibx->max_git_part;
+ my $max = $ibx->max_git_epoch;
my $dir = (split(m!/!, $http))[-1];
- if (defined($part)) { # v2
+ if (defined($max)) { # v2
$seen{$http} = 1;
- for my $i (0..$part) {
+ for my $i (0..$max) {
# old parts my be deleted:
-d "$ibx->{mainrepo}/git/$i.git" or next;
my $url = "$http/$i";
push @urls, $http;
}
- # FIXME: partitioning in can be different in other repositories,
+ # FIXME: epoch splits can be different in other repositories,
# use the "cloneurl" file as-is for now:
foreach my $u (@{$ibx->cloneurl}) {
next if $seen{$u};
push @urls, $u =~ /\Ahttps?:/ ? qq(<a\nhref="$u">$u</a>) : $u;
}
- if (defined($part) || scalar(@urls) > 1) {
+ if (defined($max) || scalar(@urls) > 1) {
$urls .= "\n" .
join("\n", map { "\tgit clone --mirror $_" } @urls);
} else {
$urls .= " git clone --mirror $urls[0]";
}
- if (defined $part) {
+ if (defined $max) {
my $addrs = $ibx->{address};
$addrs = join(' ', @$addrs) if ref($addrs) eq 'ARRAY';
$urls .= <<EOF
sub commit_changes ($$$) {
my ($ibx, $tmp, $opt) = @_;
- my $new_parts = $opt->{reshard};
+ my $reshard = $opt->{reshard};
my $reindex = $opt->{reindex};
my $im = $ibx->importer(0);
$im->lock_acquire if !$opt->{-coarse_lock};
$SIG{INT} or die 'BUG: $SIG{INT} not handled';
- my @old_part;
+ my @old_shard;
while (my ($old, $new) = each %$tmp) {
my @st = stat($old);
$over = undef;
}
- if (!defined($new)) { # culled partition
- push @old_part, $old;
+ if (!defined($new)) { # culled shard
+ push @old_shard, $old;
next;
}
die "failed to remove $prev: $!\n";
}
}
- remove_tree(@old_part);
+ remove_tree(@old_shard);
$tmp->done;
if (!$opt->{-coarse_lock}) {
$opt->{-skip_lock} = 1;
- if ($im->can('count_partitions')) {
+ if ($im->can('count_shards')) {
my $pr = $opt->{-progress};
- my $n = $im->count_partitions;
- if (defined $new_parts && $n != $new_parts) {
+ my $n = $im->count_shards;
+ if (defined $reshard && $n != $reshard) {
die
-"BUG: counted $n partitions after repartioning to $new_parts";
+"BUG: counted $n shards after resharding to $reshard";
}
- my $prev = $im->{partitions};
+ my $prev = $im->{shards};
if ($pr && $prev != $n) {
- $pr->("partition count changed: $prev => $n\n");
- $im->{partitions} = $n;
+ $pr->("shard count changed: $prev => $n\n");
+ $im->{shards} = $n;
}
}
my $tmp = PublicInbox::Xtmpdirs->new;
my $v = $ibx->{version} ||= 1;
my @q;
- my $new_parts = $opt->{reshard};
- if (defined $new_parts && $new_parts <= 0) {
+ my $reshard = $opt->{reshard};
+ if (defined $reshard && $reshard <= 0) {
die "--reshard must be a positive number\n";
}
# we want temporary directories to be as deep as possible,
- # so v2 partitions can keep "xap$SCHEMA_VERSION" on a separate FS.
+ # so v2 shards can keep "xap$SCHEMA_VERSION" on a separate FS.
if ($v == 1) {
- if (defined $new_parts) {
+ if (defined $reshard) {
warn
-"--reshard=$new_parts ignored for v1 $ibx->{mainrepo}\n";
+"--reshard=$reshard ignored for v1 $ibx->{mainrepo}\n";
}
my $old_parent = dirname($old);
same_fs_or_die($old_parent, $old);
push @q, [ $old, $wip ];
} else {
opendir my $dh, $old or die "Failed to opendir $old: $!\n";
- my @old_parts;
+ my @old_shards;
while (defined(my $dn = readdir($dh))) {
if ($dn =~ /\A[0-9]+\z/) {
- push @old_parts, $dn;
+ push @old_shards, $dn;
} elsif ($dn eq '.' || $dn eq '..') {
} elsif ($dn =~ /\Aover\.sqlite3/) {
} else {
warn "W: skipping unknown dir: $old/$dn\n"
}
}
- die "No Xapian parts found in $old\n" unless @old_parts;
+ die "No Xapian shards found in $old\n" unless @old_shards;
- my ($src, $max_part);
- if (!defined($new_parts) || $new_parts == scalar(@old_parts)) {
+ my ($src, $max_shard);
+ if (!defined($reshard) || $reshard == scalar(@old_shards)) {
# 1:1 copy
- $max_part = scalar(@old_parts) - 1;
+ $max_shard = scalar(@old_shards) - 1;
} else {
# M:N copy
- $max_part = $new_parts - 1;
- $src = [ map { "$old/$_" } @old_parts ];
+ $max_shard = $reshard - 1;
+ $src = [ map { "$old/$_" } @old_shards ];
}
- foreach my $dn (0..$max_part) {
+ foreach my $dn (0..$max_shard) {
my $tmpl = "$dn-XXXXXXXX";
my $wip = tempdir($tmpl, DIR => $old);
same_fs_or_die($old, $wip);
push @q, [ $src // $cur , $wip ];
$tmp->{$cur} = $wip;
}
- # mark old parts to be unlinked
+ # mark old shards to be unlinked
if ($src) {
$tmp->{$_} ||= undef for @$src;
}
}
sub cpdb_loop ($$$;$$) {
- my ($src, $dst, $pr_data, $cur_part, $new_parts) = @_;
+ my ($src, $dst, $pr_data, $cur_shard, $reshard) = @_;
my ($pr, $fmt, $nr, $pfx);
if ($pr_data) {
$pr = $pr_data->{pr};
eval {
for (; $it != $end; $it++) {
my $docid = $it->get_docid;
- if (defined $new_parts) {
- my $dst_part = $docid % $new_parts;
- next if $dst_part != $cur_part;
+ if (defined $reshard) {
+ my $dst_shard = $docid % $reshard;
+ next if $dst_shard != $cur_shard;
}
my $doc = $src->get_document($docid);
$dst->replace_document($docid, $doc);
sub cpdb ($$) {
my ($args, $opt) = @_;
my ($old, $new) = @$args;
- my ($src, $cur_part);
- my $new_parts;
+ my ($src, $cur_shard);
+ my $reshard;
if (ref($old) eq 'ARRAY') {
- ($cur_part) = ($new =~ m!xap[0-9]+/([0-9]+)\b!);
- defined $cur_part or
- die "BUG: could not extract partition # from $new";
- $new_parts = $opt->{reshard};
- defined $new_parts or die 'BUG: got array src w/o --partition';
+ ($cur_shard) = ($new =~ m!xap[0-9]+/([0-9]+)\b!);
+ defined $cur_shard or
+ die "BUG: could not extract shard # from $new";
+ $reshard = $opt->{reshard};
+ defined $reshard or die 'BUG: got array src w/o --reshard';
- # repartitioning, M:N copy means have full read access
+ # resharding, M:N copy means have full read access
foreach (@$old) {
if ($src) {
my $sub = Search::Xapian::Database->new($_);
my $lc = $src->get_metadata('last_commit');
$dst->set_metadata('last_commit', $lc) if $lc;
- # only the first xapian partition (0) gets 'indexlevel'
+ # only the first xapian shard (0) gets 'indexlevel'
if ($new =~ m!(?:xapian[0-9]+|xap[0-9]+/0)\b!) {
my $l = $src->get_metadata('indexlevel');
if ($l eq 'medium') {
if ($pr_data) {
my $tot = $src->get_doccount;
- # we can only estimate when repartitioning,
+ # we can only estimate when resharding,
# because removed spam causes slight imbalance
my $est = '';
- if (defined $cur_part && $new_parts > 1) {
- $tot = int($tot/$new_parts);
+ if (defined $cur_shard && $reshard > 1) {
+ $tot = int($tot/$reshard);
$est = 'around ';
}
my $fmt = "$pfx % ".length($tot)."u/$tot\n";
};
} while (cpdb_retryable($src, $pfx));
- if (defined $new_parts) {
+ if (defined $reshard) {
# we rely on document IDs matching NNTP article number,
- # so we can't have the combined DB support rewriting
+ # so we can't have the Xapian sharding DB support rewriting
# document IDs. Thus we iterate through each shard
# individually.
$src = undef;
foreach (@$old) {
my $old = Search::Xapian::Database->new($_);
- cpdb_loop($old, $dst, $pr_data, $cur_part, $new_parts);
+ cpdb_loop($old, $dst, $pr_data, $cur_shard, $reshard);
}
} else {
cpdb_loop($src, $dst, $pr_data);
# http://www.tldp.org/LDP/abs/html/exitcodes.html
$SIG{INT} = sub { exit(130) };
$SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) };
- my $self = bless {}, $_[0]; # old partition => new (tmp) partition
+ my $self = bless {}, $_[0]; # old shard => new (WIP) shard
$owner{"$self"} = $$;
$self;
}
my $owner_pid = delete $owner{"$self"} or return;
return if $owner_pid != $$;
foreach my $new (values %$self) {
- defined $new or next; # may be undef if repartitioning
+ defined $new or next; # may be undef if resharding
remove_tree($new) unless -d "$new/old";
}
done($self);
if ($v == 2 && $level eq 'basic') {
is_deeply([glob("$ibx->{mainrepo}/xap*/?/")], [],
- 'no Xapian partition directories for v2 basic');
+ 'no Xapian shard directories for v2 basic');
}
if ($level ne 'basic') {
($nr, $msgs) = $ro_mirror->search->reopen->query('m:m@2');
$res = $cb->(GET('/v2test/0.git/info/refs'));
is($res->code, 200, 'got info refs for dumb clones w/ .git suffix');
$res = $cb->(GET('/v2test/info/refs'));
- is($res->code, 404, 'unpartitioned git URL fails');
+ is($res->code, 404, 'v2 git URL w/o shard fails');
# ensure conflicted attachments can be resolved
foreach my $body (qw(old new)) {
);
my $im = PublicInbox::V2Writable->new($ibx, {nproc => 1});
-is($im->{partitions}, 1, 'one partition when forced');
+is($im->{shards}, 1, 'one shard when forced');
ok($im->add($mime), 'ordinary message added');
foreach my $f ("$mainrepo/msgmap.sqlite3",
glob("$mainrepo/xap*/*"),
my @before = $git0->qx(@log, qw(--pretty=oneline));
my $before = $git0->qx(@log, qw(--pretty=raw --raw -r));
$im = PublicInbox::V2Writable->new($ibx, {nproc => 2});
- is($im->{partitions}, 1, 'detected single partition from previous');
+ is($im->{shards}, 1, 'detected single shard from previous');
my $smsg = $im->remove($mime, 'test removal');
$im->done;
my @after = $git0->qx(@log, qw(--pretty=oneline));
base_url => sub { 'http://example.com/' },
cloneurl => sub {[]},
nntp_url => sub {[]},
- max_git_part => sub { undef },
+ max_git_epoch => sub { undef },
description => sub { '' }),
www => Plack::Util::inline_object(style => sub { '' }),
};
ok($im->add($mime), "message $i added");
}
$im->done;
-my @parts = grep(m!/\d+\z!, glob("$ibx->{mainrepo}/xap*/*"));
-is(scalar(@parts), $nproc, 'got expected parts');
+my @shards = grep(m!/\d+\z!, glob("$ibx->{mainrepo}/xap*/*"));
+is(scalar(@shards), $nproc, 'got expected shards');
my $orig = $ibx->over->query_xover(1, $ndoc);
my %nums = map {; "$_->{num}" => 1 } @$orig;
-# ensure we can go up or down in partitions, or stay the same:
+# ensure we can go up or down in shards, or stay the same:
for my $R (qw(2 4 1 3 3)) {
delete $ibx->{search}; # release old handles
is(system(@xcpdb, "-R$R", $ibx->{mainrepo}), 0, "xcpdb -R$R");
- my @new_parts = grep(m!/\d+\z!, glob("$ibx->{mainrepo}/xap*/*"));
- is(scalar(@new_parts), $R, 'repartitioned to two parts');
+ my @new_shards = grep(m!/\d+\z!, glob("$ibx->{mainrepo}/xap*/*"));
+ is(scalar(@new_shards), $R, 'resharded to two shards');
my $msgs = $ibx->search->query('s:this');
- is(scalar(@$msgs), $ndoc, 'got expected docs after repartitioning');
+ is(scalar(@$msgs), $ndoc, 'got expected docs after resharding');
my %by_mid = map {; "$_->{mid}" => $_ } @$msgs;
ok($by_mid{"m$_\@example.com"}, "$_ exists") for (1..$ndoc);
# ensure docids in Xapian match NNTP article numbers
my $tot = 0;
my %tmp = %nums;
- foreach my $d (@new_parts) {
+ foreach my $d (@new_shards) {
my $xdb = Search::Xapian::Database->new($d);
$tot += $xdb->get_doccount;
my $it = $xdb->postlist_begin('');