use strict;
use warnings;
use Fcntl qw(:flock :DEFAULT);
-use PublicInbox::SearchIdx;
+use PublicInbox::SearchIdxPart;
+use PublicInbox::SearchIdxThread;
use PublicInbox::MIME;
use PublicInbox::Git;
use PublicInbox::Import;
im => undef, # PublicInbox::Import
xap_rw => undef, # PublicInbox::V2SearchIdx
xap_ro => undef,
-
+ partitions => 4,
+ transact_bytes => 0,
# limit each repo to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
};
my $cmt = $im->add($mime, $check_cb) or return;
$cmt = $im->get_mark($cmt);
my $oid = $im->{last_object_id};
- my $size = $im->{last_object_size};
-
- my $idx = $self->search_idx;
- $idx->index_both($mime, $size, $oid);
- $idx->{xdb}->set_metadata('last_commit', $cmt);
- my $n = $self->{transact_bytes} += $size;
- if ($n > PublicInbox::SearchIdx::BATCH_BYTES) {
+ my ($len, $msgref) = @{$im->{last_object}};
+
+ my $nparts = $self->{partitions};
+ my $part = hex(substr($oid, 0, 8)) % $nparts;
+ my $idx = $self->idx_part($part);
+ my $all = $self->{all};
+ my $num = $all->index_mm($mime);
+ $idx->index_raw($len, $msgref, $num, $oid);
+ my $n = $self->{transact_bytes} += $len;
+ if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
$self->checkpoint;
}
$mime;
}
-sub search_idx {
- my ($self) = @_;
- $self->{idx} ||= eval {
- my $idx = PublicInbox::SearchIdx->new($self->{-inbox}, 1);
- my $mm = $idx->_msgmap_init;
- $idx->_xdb_acquire->begin_transaction;
- $self->{transact_bytes} = 0;
- $mm->{dbh}->begin_work;
- $idx
- };
+sub idx_part {
+ my ($self, $part) = @_;
+ my $idx = $self->{idx_parts};
+ return $idx->[$part] if $idx; # fast path
+
+ # first time initialization:
+ my $all = $self->{all} =
+ PublicInbox::SearchIdxThread->new($self->{-inbox});
+
+ # need to create all parts before initializing msgmap FD
+ my $max = $self->{partitions} - 1;
+ $idx = $self->{idx_parts} = [];
+ for my $i (0..$max) {
+ push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
+ }
+ $all->_msgmap_init->{dbh}->begin_work;
+ $idx->[$part];
}
sub remove {
my ($self) = @_;
my $im = $self->{im};
$im->done if $im; # PublicInbox::Import::done
- $self->searchidx_checkpoint;
+ $self->searchidx_checkpoint(0);
}
sub checkpoint {
my ($self) = @_;
my $im = $self->{im};
$im->checkpoint if $im; # PublicInbox::Import::checkpoint
- $self->searchidx_checkpoint;
+ $self->searchidx_checkpoint(1);
}
sub searchidx_checkpoint {
- my ($self) = @_;
- my $idx = delete $self->{idx} or return;
+ my ($self, $more) = @_;
+
+ # order matters, we can only close {all} after all partitions
+ # are done because the partitions also write to {all}
+
+ my $parts = $self->{idx_parts};
+ foreach my $idx (@$parts) {
+ $idx->remote_commit;
+ $idx->remote_close unless $more;
+ }
- $idx->{mm}->{dbh}->commit;
- $idx->{xdb}->commit_transaction;
- $idx->_xdb_release;
+ if (my $all = $self->{all}) {
+ $all->{mm}->{dbh}->commit;
+ if ($more) {
+ $all->{mm}->{dbh}->begin_work;
+ }
+ $all->remote_commit;
+ $all->remote_close unless $more;
+ }
+ $self->{transact_bytes} = 0;
}
sub git_init {
} else {
$self->{im} = undef;
$im->done;
- $self->searchidx_checkpoint;
+ $self->searchidx_checkpoint(1);
$im = undef;
my $git_dir = $self->git_init(++$self->{max_git});
my $git = PublicInbox::Git->new($git_dir);