X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=6b1effe58e133cebf3a74f107698b40952818fad;hp=7bc24592bbecac65ff19dc4f9af8b31f569257ef;hb=99def67b86c4d270e8cfda4d1fad418291b6f3a4;hpb=06a2418fd053c9a5b80217e74d1b47b8e1ca85e1 diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 7bc24592..6b1effe5 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -1185,22 +1185,24 @@ sub index_xap_only { # git->cat_async callback my ($bref, $oid, $type, $size, $smsg) = @_; my $self = $smsg->{v2w}; my $idx = idx_shard($self, $smsg->{num} % $self->{shards}); - $idx->begin_txn_lazy; - $idx->add_message(PublicInbox::Eml->new($bref), $smsg); + $smsg->{raw_bytes} = $size; + $idx->index_raw($bref, undef, $smsg); $self->{transact_bytes} += $size; } -sub index_seq_shard ($$$) { - my ($self, $sync, $off) = @_; +sub index_xap_step ($$$;$) { + my ($self, $sync, $beg, $step) = @_; my $ibx = $self->{ibx}; - my $max = $ibx->mm->max or return; my $all = $ibx->git; my $over = $ibx->over; my $batch_bytes = $PublicInbox::SearchIdx::BATCH_BYTES; + $step //= $self->{shards}; + my $end = $sync->{art_end}; if (my $pr = $sync->{-opt}->{-progress}) { - $pr->("Xapian indexlevel=$ibx->{indexlevel} % $off\n"); + $pr->("Xapian indexlevel=$ibx->{indexlevel} ". + "$beg..$end (% $step)\n"); } - for (my $num = $off; $num <= $max; $num += $self->{shards}) { + for (my $num = $beg; $num <= $end; $num += $step) { my $smsg = $over->get_art($num) or next; $smsg->{v2w} = $self; $all->cat_async($smsg->{blob}, \&index_xap_only, $smsg); @@ -1244,10 +1246,37 @@ sub index_epoch ($$$) { update_last_commit($self, $git, $i, $stk->{latest_cmt}); } +sub xapian_only { + my ($self, $opt, $sync) = @_; + my $seq = $opt->{sequentialshard}; + local $self->{parallel} = 0 if $seq; + $self->idx_init($opt); # acquire lock + if (my $art_end = $self->{ibx}->mm->max) { + $sync //= { + need_checkpoint => \(my $bool = 0), + -opt => $opt, + v2w => $self, + nr => \(my $nr = 0), + -regen_fmt => "%u/?\n", + }; + $sync->{art_end} = $art_end; + if ($seq || !$self->{parallel}) { + my $shard_end = $self->{shards} - 1; + index_xap_step($self, $sync, $_) for (0..$shard_end); + } else { # parallel (maybe) + index_xap_step($self, $sync, 0, 1); + } + } + $self->{ibx}->git->cat_async_wait; + $self->done; +} + # public, called by public-inbox-index sub index_sync { my ($self, $opt) = @_; - $opt ||= {}; + $opt //= $_[1] //= {}; + goto \&xapian_only if $opt->{xapianonly}; + my $pr = $opt->{-progress}; my $epoch_max; my $latest = git_dir_latest($self, \$epoch_max); @@ -1292,13 +1321,9 @@ sub index_sync { } if ($seq) { # deal with Xapian shards sequentially - my $end = $self->{shards} - 1; $self->{ibx}->{indexlevel} = $idxlevel; delete $sync->{mm_tmp}; - $self->idx_init($opt); # re-acquire lock - index_seq_shard($self, $sync, $_) for (0..$end); - $self->{ibx}->git->cat_async_wait; - $self->done; + xapian_only($self, $opt, $sync); } # reindex does not pick up new changes, so we rerun w/o it: