Instead of relying on the git object_id hash to partition,
round-robin to these partitions based on the NNTP article
number. This reduces the partition pipes as a source of
contention when two (or more) sequential messages end up
going to the same partition.
# an estimate of the post-packed size to the raw uncompressed size
my $PACKING_FACTOR = 0.4;
# an estimate of the post-packed size to the raw uncompressed size
my $PACKING_FACTOR = 0.4;
+# assume 2 cores if GNU nproc(1) is not available
+my $NPROC = int($ENV{NPROC} || `nproc 2>/dev/null` || 2);
+
sub new {
my ($class, $v2ibx, $creat) = @_;
my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n";
sub new {
my ($class, $v2ibx, $creat) = @_;
my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n";
im => undef, # PublicInbox::Import
xap_rw => undef, # PublicInbox::V2SearchIdx
xap_ro => undef,
im => undef, # PublicInbox::Import
xap_rw => undef, # PublicInbox::V2SearchIdx
xap_ro => undef,
transact_bytes => 0,
# limit each repo to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
transact_bytes => 0,
# limit each repo to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
my $oid = $im->{last_object_id};
my ($len, $msgref) = @{$im->{last_object}};
my $oid = $im->{last_object_id};
my ($len, $msgref) = @{$im->{last_object}};
+ $self->idx_init;
+ my $num = $self->{all}->index_mm($mime);
my $nparts = $self->{partitions};
my $nparts = $self->{partitions};
- my $part = hex(substr($oid, 0, 8)) % $nparts;
+ my $part = $num % $nparts;
my $idx = $self->idx_part($part);
my $idx = $self->idx_part($part);
- my $all = $self->{all};
- my $num = $all->index_mm($mime);
$idx->index_raw($len, $msgref, $num, $oid);
my $n = $self->{transact_bytes} += $len;
if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
$idx->index_raw($len, $msgref, $num, $oid);
my $n = $self->{transact_bytes} += $len;
if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
sub idx_part {
my ($self, $part) = @_;
sub idx_part {
my ($self, $part) = @_;
- my $idx = $self->{idx_parts};
- return $idx->[$part] if $idx; # fast path
+ $self->{idx_parts}->[$part];
+}
+sub idx_init {
+ my ($self) = @_;
+ return if $self->{idx_parts};
# first time initialization:
# first time initialization:
- my $all = $self->{all} =
+ my $all = $self->{all} =
PublicInbox::SearchIdxThread->new($self->{-inbox});
# need to create all parts before initializing msgmap FD
my $max = $self->{partitions} - 1;
PublicInbox::SearchIdxThread->new($self->{-inbox});
# need to create all parts before initializing msgmap FD
my $max = $self->{partitions} - 1;
- $idx = $self->{idx_parts} = [];
+ my $idx = $self->{idx_parts} = [];
for my $i (0..$max) {
push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
}
$all->_msgmap_init->{dbh}->begin_work;
for my $i (0..$max) {
push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
}
$all->_msgmap_init->{dbh}->begin_work;
# order matters, we can only close {all} after all partitions
# are done because the partitions also write to {all}
# order matters, we can only close {all} after all partitions
# are done because the partitions also write to {all}
- my $parts = $self->{idx_parts};
- foreach my $idx (@$parts) {
- $idx->remote_commit;
- $idx->remote_close unless $more;
+ if (my $parts = $self->{idx_parts}) {
+ foreach my $idx (@$parts) {
+ $idx->remote_commit;
+ $idx->remote_close unless $more;
+ }
+ delete $self->{idx_parts} unless $more;
}
if (my $all = $self->{all}) {
}
if (my $all = $self->{all}) {
}
$all->remote_commit;
$all->remote_close unless $more;
}
$all->remote_commit;
$all->remote_close unless $more;
+ delete $self->{all} unless $more;
}
$self->{transact_bytes} = 0;
}
}
$self->{transact_bytes} = 0;
}