X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=cb74ab1a256f2bf007bf81d1cb5819748bb2db20;hb=9ecbfc09928dada28094fd3fc79e91a5472b27ea;hp=41bfb8d19f2cd505ad0096415c85fb58c1b7e541;hpb=a81ad9c4b1b5d8c2ae8444b6dcb8710bd361f628;p=public-inbox.git diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 41bfb8d1..cb74ab1a 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -6,7 +6,8 @@ package PublicInbox::V2Writable; use strict; use warnings; use Fcntl qw(:flock :DEFAULT); -use PublicInbox::SearchIdx; +use PublicInbox::SearchIdxPart; +use PublicInbox::SearchIdxThread; use PublicInbox::MIME; use PublicInbox::Git; use PublicInbox::Import; @@ -32,7 +33,8 @@ sub new { im => undef, # PublicInbox::Import xap_rw => undef, # PublicInbox::V2SearchIdx xap_ro => undef, - + partitions => 4, + transact_bytes => 0, # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), }; @@ -55,29 +57,39 @@ sub add { my $cmt = $im->add($mime, $check_cb) or return; $cmt = $im->get_mark($cmt); my $oid = $im->{last_object_id}; - my $size = $im->{last_object_size}; - - my $idx = $self->search_idx; - $idx->index_both($mime, $size, $oid); - $idx->{xdb}->set_metadata('last_commit', $cmt); - my $n = $self->{transact_bytes} += $size; - if ($n > PublicInbox::SearchIdx::BATCH_BYTES) { + my ($len, $msgref) = @{$im->{last_object}}; + + my $nparts = $self->{partitions}; + my $part = hex(substr($oid, 0, 8)) % $nparts; + my $idx = $self->idx_part($part); + my $all = $self->{all}; + my $num = $all->index_mm($mime); + $idx->index_raw($len, $msgref, $num, $oid); + my $n = $self->{transact_bytes} += $len; + if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) { $self->checkpoint; } $mime; } -sub search_idx { - my ($self) = @_; - $self->{idx} ||= eval { - my $idx = PublicInbox::SearchIdx->new($self->{-inbox}, 1); - my $mm = $idx->_msgmap_init; - $idx->_xdb_acquire->begin_transaction; - $self->{transact_bytes} = 0; - $mm->{dbh}->begin_work; - $idx - }; +sub idx_part { + my ($self, $part) = @_; + my $idx = $self->{idx_parts}; + return $idx->[$part] if $idx; # fast path + + # first time initialization: + my $all = $self->{all} = + PublicInbox::SearchIdxThread->new($self->{-inbox}); + + # need to create all parts before initializing msgmap FD + my $max = $self->{partitions} - 1; + $idx = $self->{idx_parts} = []; + for my $i (0..$max) { + push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all); + } + $all->_msgmap_init->{dbh}->begin_work; + $idx->[$part]; } sub remove { @@ -99,23 +111,37 @@ sub done { my ($self) = @_; my $im = $self->{im}; $im->done if $im; # PublicInbox::Import::done - $self->searchidx_checkpoint; + $self->searchidx_checkpoint(0); } sub checkpoint { my ($self) = @_; my $im = $self->{im}; $im->checkpoint if $im; # PublicInbox::Import::checkpoint - $self->searchidx_checkpoint; + $self->searchidx_checkpoint(1); } sub searchidx_checkpoint { - my ($self) = @_; - my $idx = delete $self->{idx} or return; + my ($self, $more) = @_; + + # order matters, we can only close {all} after all partitions + # are done because the partitions also write to {all} + + my $parts = $self->{idx_parts}; + foreach my $idx (@$parts) { + $idx->remote_commit; + $idx->remote_close unless $more; + } - $idx->{mm}->{dbh}->commit; - $idx->{xdb}->commit_transaction; - $idx->_xdb_release; + if (my $all = $self->{all}) { + $all->{mm}->{dbh}->commit; + if ($more) { + $all->{mm}->{dbh}->begin_work; + } + $all->remote_commit; + $all->remote_close unless $more; + } + $self->{transact_bytes} = 0; } sub git_init { @@ -158,7 +184,7 @@ sub importer { } else { $self->{im} = undef; $im->done; - $self->searchidx_checkpoint; + $self->searchidx_checkpoint(1); $im = undef; my $git_dir = $self->git_init(++$self->{max_git}); my $git = PublicInbox::Git->new($git_dir);