X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=c8334645d3cbcff5bf0b290bda75db7bd8113022;hb=d73d783ab2cf14ba28ca63723223d8c85a68cdd5;hp=72198a2983282865fdcf94ed7c784be038bc94d6;hpb=6a7e3c6f870d0555184b68940eb373fa102d4102;p=public-inbox.git diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 72198a29..c8334645 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -8,7 +8,6 @@ use strict; use v5.10.1; use parent qw(PublicInbox::Lock); use PublicInbox::SearchIdxShard; -use PublicInbox::IdxStack; use PublicInbox::Eml; use PublicInbox::Git; use PublicInbox::Import; @@ -115,13 +114,13 @@ sub new { total_bytes => 0, current_info => '', xpfx => $xpfx, - over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3"), + oidx => PublicInbox::OverIdx->new("$xpfx/over.sqlite3"), lock_path => "$dir/inbox.lock", # limit each git repo (epoch) to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), last_commit => [], # git epoch -> commit }; - $self->{over}->{-no_fsync} = 1 if $v2ibx->{-no_fsync}; + $self->{oidx}->{-no_fsync} = 1 if $v2ibx->{-no_fsync}; $self->{shards} = count_shards($self) || nproc_shards($creat); bless $self, $class; } @@ -155,7 +154,7 @@ sub add { sub do_idx ($$$$) { my ($self, $msgref, $mime, $smsg) = @_; $smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref); - $self->{over}->add_overview($mime, $smsg); + $self->{oidx}->add_overview($mime, $smsg); my $idx = idx_shard($self, $smsg->{num} % $self->{shards}); $idx->index_raw($msgref, $mime, $smsg); my $n = $self->{transact_bytes} += $smsg->{raw_bytes}; @@ -220,7 +219,7 @@ sub v2_num_for { if ($altid && grep(/:file=msgmap\.sqlite3\z/, @$altid)) { my $num = $self->{mm}->num_for($mid); - if (defined $num && !$self->{over}->get_art($num)) { + if (defined $num && !$self->{oidx}->get_art($num)) { return ($num, $mid); } } @@ -275,7 +274,7 @@ sub idx_shard { sub _idx_init { # with_umask callback my ($self, $opt) = @_; $self->lock_acquire unless $opt && $opt->{-skip_lock}; - $self->{over}->create; + $self->{oidx}->create; # xcpdb can change shard count while -watch is idle my $nshards = count_shards($self); @@ -382,7 +381,7 @@ sub rewrite_internal ($$;$$$) { } else { $im = $self->importer; } - my $over = $self->{over}; + my $oidx = $self->{oidx}; my $chashes = content_hashes($old_eml); my $removed = []; my $mids = mids($old_eml); @@ -396,7 +395,7 @@ sub rewrite_internal ($$;$$$) { foreach my $mid (@$mids) { my %gone; # num => [ smsg, $mime, raw ] my ($id, $prev); - while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { + while (my $smsg = $oidx->next_by_mid($mid, \$id, \$prev)) { my $msg = get_blob($self, $smsg); if (!defined($msg)) { warn "broken smsg for $mid\n"; @@ -624,19 +623,19 @@ sub checkpoint ($;$) { $dbh->commit; # SQLite overview is third - $self->{over}->commit_lazy; + $self->{oidx}->commit_lazy; # Now deal with Xapian if ($wait) { my $barrier = $self->barrier_init(scalar @$shards); # each shard needs to issue a barrier command - $_->remote_barrier for @$shards; + $_->shard_barrier for @$shards; # wait for each Xapian shard $self->barrier_wait($barrier); } else { - $_->remote_commit for @$shards; + $_->shard_commit for @$shards; } # last_commit is special, don't commit these until @@ -656,6 +655,9 @@ sub checkpoint ($;$) { # public sub barrier { checkpoint($_[0], 1) }; +# true if locked and active +sub active { !!$_[0]->{im} } + # public sub done { my ($self) = @_; @@ -676,12 +678,12 @@ sub done { my $shards = delete $self->{idx_shards}; if ($shards) { for (@$shards) { - eval { $_->remote_close }; + eval { $_->shard_close }; $err .= "shard close: $@\n" if $@; } } - eval { $self->{over}->disconnect }; - $err .= "over disconnect: $@\n" if $@; + eval { $self->{oidx}->dbh_close }; + $err .= "over close: $@\n" if $@; delete $self->{bnote}; my $nbytes = $self->{total_bytes}; $self->{total_bytes} = 0; @@ -842,10 +844,10 @@ sub get_blob ($$) { sub content_exists ($$$) { my ($self, $mime, $mid) = @_; - my $over = $self->{over}; + my $oidx = $self->{oidx}; my $chashes = content_hashes($mime); my ($id, $prev); - while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { + while (my $smsg = $oidx->next_by_mid($mid, \$id, \$prev)) { my $msg = get_blob($self, $smsg); if (!defined($msg)) { warn "broken smsg for $mid\n"; @@ -915,9 +917,9 @@ sub index_oid { # cat_async callback } } if (!defined($num)) { # reuse if reindexing (or duplicates) - my $over = $self->{over}; + my $oidx = $self->{oidx}; for my $mid (@$mids) { - ($num, $mid0) = $over->num_mid0_for_oid($oid, $mid); + ($num, $mid0) = $oidx->num_mid0_for_oid($oid, $mid); last if defined $num; } } @@ -996,7 +998,7 @@ sub log_range ($$$$$) { my $opt = $sync->{-opt}; my $pr = $opt->{-progress} if (($opt->{verbose} || 0) > 1); my $cur = $sync->{ranges}->[$i] or do { - $pr->("$i.git indexing all of $tip") if $pr; + $pr->("$i.git indexing all of $tip\n") if $pr; return $tip; # all of it }; @@ -1105,10 +1107,10 @@ sub sync_prepare ($$$) { sub unindex_oid_remote ($$$) { my ($self, $oid, $mid) = @_; - my @removed = $self->{over}->remove_oid($oid, $mid); + my @removed = $self->{oidx}->remove_oid($oid, $mid); for my $num (@removed) { my $idx = idx_shard($self, $num % $self->{shards}); - $idx->remote_remove($oid, $num); + $idx->shard_remove($oid, $num); } } @@ -1119,11 +1121,11 @@ sub unindex_oid ($$;$) { # git->cat_async callback my $mm = $self->{mm}; my $mids = mids(PublicInbox::Eml->new($bref)); undef $$bref; - my $over = $self->{over}; + my $oidx = $self->{oidx}; foreach my $mid (@$mids) { my %gone; my ($id, $prev); - while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { + while (my $smsg = $oidx->next_by_mid($mid, \$id, \$prev)) { $gone{$smsg->{num}} = 1 if $oid eq $smsg->{blob}; } my $n = scalar(keys(%gone)) or next; @@ -1265,8 +1267,11 @@ sub xapian_only { $sync->{art_end} = $art_end; if ($seq || !$self->{parallel}) { my $shard_end = $self->{shards} - 1; - for (0..$shard_end) { - index_xap_step($self, $sync, $art_beg + $_) + for my $i (0..$shard_end) { + index_xap_step($self, $sync, $art_beg + $i); + if ($i != $shard_end) { + reindex_checkpoint($self, $sync); + } } } else { # parallel (maybe) index_xap_step($self, $sync, $art_beg, 1); @@ -1294,7 +1299,7 @@ sub index_sync { $self->idx_init($opt); # acquire lock fill_alternates($self, $epoch_max); - $self->{over}->rethread_prepare($opt); + $self->{oidx}->rethread_prepare($opt); my $sync = { need_checkpoint => \(my $bool = 0), unindex_range => {}, # EPOCH => oid_old..oid_new @@ -1324,7 +1329,7 @@ sub index_sync { } # work forwards through history index_epoch($self, $sync, $_) for (0..$epoch_max); - $self->{over}->rethread_done($opt); + $self->{oidx}->rethread_done($opt); $self->done; if (my $nr = $sync->{nr}) { @@ -1338,6 +1343,18 @@ sub index_sync { xapian_only($self, $opt, $sync, $art_beg); } + # --reindex on the command-line + if ($opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') { + $self->lock_acquire; + my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0); + if (my $xdb = $s0->idx_acquire) { + my $n = $xdb->get_metadata('has_threadid'); + $xdb->set_metadata('has_threadid', '1') if $n ne '1'; + } + $s0->idx_release; + $self->lock_release; + } + # reindex does not pick up new changes, so we rerun w/o it: if ($opt->{reindex}) { my %again = %$opt;