]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
v2: parallelize Xapian indexing
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index 41bfb8d19f2cd505ad0096415c85fb58c1b7e541..cb74ab1a256f2bf007bf81d1cb5819748bb2db20 100644 (file)
@@ -6,7 +6,8 @@ package PublicInbox::V2Writable;
 use strict;
 use warnings;
 use Fcntl qw(:flock :DEFAULT);
-use PublicInbox::SearchIdx;
+use PublicInbox::SearchIdxPart;
+use PublicInbox::SearchIdxThread;
 use PublicInbox::MIME;
 use PublicInbox::Git;
 use PublicInbox::Import;
@@ -32,7 +33,8 @@ sub new {
                im => undef, #  PublicInbox::Import
                xap_rw => undef, # PublicInbox::V2SearchIdx
                xap_ro => undef,
-
+               partitions => 4,
+               transact_bytes => 0,
                # limit each repo to 1GB or so
                rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
        };
@@ -55,29 +57,39 @@ sub add {
        my $cmt = $im->add($mime, $check_cb) or return;
        $cmt = $im->get_mark($cmt);
        my $oid = $im->{last_object_id};
-       my $size = $im->{last_object_size};
-
-       my $idx = $self->search_idx;
-       $idx->index_both($mime, $size, $oid);
-       $idx->{xdb}->set_metadata('last_commit', $cmt);
-       my $n = $self->{transact_bytes} += $size;
-       if ($n > PublicInbox::SearchIdx::BATCH_BYTES) {
+       my ($len, $msgref) = @{$im->{last_object}};
+
+       my $nparts = $self->{partitions};
+       my $part = hex(substr($oid, 0, 8)) % $nparts;
+       my $idx = $self->idx_part($part);
+       my $all = $self->{all};
+       my $num = $all->index_mm($mime);
+       $idx->index_raw($len, $msgref, $num, $oid);
+       my $n = $self->{transact_bytes} += $len;
+       if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
                $self->checkpoint;
        }
 
        $mime;
 }
 
-sub search_idx {
-       my ($self) = @_;
-       $self->{idx} ||= eval {
-               my $idx = PublicInbox::SearchIdx->new($self->{-inbox}, 1);
-               my $mm = $idx->_msgmap_init;
-               $idx->_xdb_acquire->begin_transaction;
-               $self->{transact_bytes} = 0;
-               $mm->{dbh}->begin_work;
-               $idx
-       };
+sub idx_part {
+       my ($self, $part) = @_;
+       my $idx = $self->{idx_parts};
+       return $idx->[$part] if $idx; # fast path
+
+       # first time initialization:
+       my $all = $self->{all} = 
+               PublicInbox::SearchIdxThread->new($self->{-inbox});
+
+       # need to create all parts before initializing msgmap FD
+       my $max = $self->{partitions} - 1;
+       $idx = $self->{idx_parts} = [];
+       for my $i (0..$max) {
+               push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
+       }
+       $all->_msgmap_init->{dbh}->begin_work;
+       $idx->[$part];
 }
 
 sub remove {
@@ -99,23 +111,37 @@ sub done {
        my ($self) = @_;
        my $im = $self->{im};
        $im->done if $im; # PublicInbox::Import::done
-       $self->searchidx_checkpoint;
+       $self->searchidx_checkpoint(0);
 }
 
 sub checkpoint {
        my ($self) = @_;
        my $im = $self->{im};
        $im->checkpoint if $im; # PublicInbox::Import::checkpoint
-       $self->searchidx_checkpoint;
+       $self->searchidx_checkpoint(1);
 }
 
 sub searchidx_checkpoint {
-       my ($self) = @_;
-       my $idx = delete $self->{idx} or return;
+       my ($self, $more) = @_;
+
+       # order matters, we can only close {all} after all partitions
+       # are done because the partitions also write to {all}
+
+       my $parts = $self->{idx_parts};
+       foreach my $idx (@$parts) {
+               $idx->remote_commit;
+               $idx->remote_close unless $more;
+       }
 
-       $idx->{mm}->{dbh}->commit;
-       $idx->{xdb}->commit_transaction;
-       $idx->_xdb_release;
+       if (my $all = $self->{all}) {
+               $all->{mm}->{dbh}->commit;
+               if ($more) {
+                       $all->{mm}->{dbh}->begin_work;
+               }
+               $all->remote_commit;
+               $all->remote_close unless $more;
+       }
+       $self->{transact_bytes} = 0;
 }
 
 sub git_init {
@@ -158,7 +184,7 @@ sub importer {
                } else {
                        $self->{im} = undef;
                        $im->done;
-                       $self->searchidx_checkpoint;
+                       $self->searchidx_checkpoint(1);
                        $im = undef;
                        my $git_dir = $self->git_init(++$self->{max_git});
                        my $git = PublicInbox::Git->new($git_dir);