X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=8e3122abba155e6cd20b7bcd084b4b5665b0053d;hp=51723e55ffb44662087f349a41a69a30fb36c5c1;hb=35ff6bb106909b1c1232666a9792156dfa398ea8;hpb=7503aeb540af5afd5cb1b554b3c29f35f5fc918d diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 51723e55..8e3122ab 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -7,13 +7,15 @@ use strict; use warnings; use base qw(PublicInbox::Lock); use PublicInbox::SearchIdxPart; -use PublicInbox::SearchIdxSkeleton; use PublicInbox::MIME; use PublicInbox::Git; use PublicInbox::Import; use PublicInbox::MID qw(mids); use PublicInbox::ContentId qw(content_id content_digest); use PublicInbox::Inbox; +use PublicInbox::OverIdxFork; +use PublicInbox::Msgmap; +use IO::Handle; # an estimate of the post-packed size to the raw uncompressed size my $PACKING_FACTOR = 0.4; @@ -57,6 +59,7 @@ sub new { partitions => $nparts, parallel => 1, transact_bytes => 0, + over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"), lock_path => "$dir/inbox.lock", # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), @@ -117,7 +120,7 @@ sub num_for { my $mids = mids($mime->header_obj); if (@$mids) { my $mid = $mids->[0]; - my $num = $self->{skel}->{mm}->mid_insert($mid); + my $num = $self->{mm}->mid_insert($mid); if (defined $num) { # common case $$mid0 = $mid; return $num; @@ -140,7 +143,7 @@ sub num_for { # try the rest of the mids for(my $i = $#$mids; $i >= 1; $i--) { my $m = $mids->[$i]; - $num = $self->{skel}->{mm}->mid_insert($m); + $num = $self->{mm}->mid_insert($m); if (defined $num) { warn "alternative <$m> for <$mid> found\n"; $$mid0 = $m; @@ -158,20 +161,20 @@ sub num_for_harder { my $hdr = $mime->header_obj; my $dig = content_digest($mime); $$mid0 = PublicInbox::Import::digest2mid($dig); - my $num = $self->{skel}->{mm}->mid_insert($$mid0); + my $num = $self->{mm}->mid_insert($$mid0); unless (defined $num) { # it's hard to spoof the last Received: header my @recvd = $hdr->header_raw('Received'); $dig->add("Received: $_") foreach (@recvd); $$mid0 = PublicInbox::Import::digest2mid($dig); - $num = $self->{skel}->{mm}->mid_insert($$mid0); + $num = $self->{mm}->mid_insert($$mid0); # fall back to a random Message-ID and give up determinism: until (defined($num)) { $dig->add(rand); $$mid0 = PublicInbox::Import::digest2mid($dig); warn "using random Message-ID <$$mid0> as fallback\n"; - $num = $self->{skel}->{mm}->mid_insert($$mid0); + $num = $self->{mm}->mid_insert($$mid0); } } PublicInbox::Import::append_mid($hdr, $$mid0); @@ -194,13 +197,11 @@ sub idx_init { # frequently activated. delete $ibx->{$_} foreach (qw(git mm search)); + my $over = $self->{over}; $ibx->umask_prepare; $ibx->with_umask(sub { $self->lock_acquire; - - # first time initialization, first we create the skeleton pipe: - my $skel = PublicInbox::SearchIdxSkeleton->new($self); - $self->{skel} = $skel; + $over->create($self); # need to create all parts before initializing msgmap FD my $max = $self->{partitions} - 1; @@ -208,12 +209,14 @@ sub idx_init { # idx_parts must be visible to all forked processes my $idx = $self->{idx_parts} = []; for my $i (0..$max) { - push @$idx, - PublicInbox::SearchIdxPart->new($self, $i, $skel); + push @$idx, PublicInbox::SearchIdxPart->new($self, $i); } - # Now that all subprocesses are up, we can open the FD for SQLite: - $skel->_msgmap_init->{dbh}->begin_work; + # Now that all subprocesses are up, we can open the FDs + # for SQLite: + my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( + "$self->{-inbox}->{mainrepo}/msgmap.sqlite3", 1); + $mm->{dbh}->begin_work; }); } @@ -236,9 +239,8 @@ sub remove_internal { my $ibx = $self->{-inbox}; my $srch = $ibx->search; my $cid = content_id($mime); - my $skel = $self->{skel}; my $parts = $self->{idx_parts}; - my $mm = $skel->{mm}; + my $mm = $self->{mm}; my $removed; my $mids = mids($mime->header_obj); @@ -273,9 +275,10 @@ sub remove_internal { $orig = undef; $removed->num; # memoize this for callers - foreach my $idx (@$parts, $skel) { + foreach my $idx (@$parts) { $idx->remote_remove($oid, $mid); } + $self->{over}->remove_oid($oid, $mid); } 1; # continue }); @@ -322,18 +325,20 @@ sub barrier { if (my $im = $self->{im}) { $im->barrier; } - my $skel = $self->{skel}; my $parts = $self->{idx_parts}; - if ($parts && $skel) { - my $dbh = $skel->{mm}->{dbh}; - $dbh->commit; # SQLite data is second in importance + if ($parts) { + my $dbh = $self->{mm}->{dbh}; + $dbh->commit; # SQLite msgmap data is second in importance + + my $over = $self->{over}; - # Now deal with Xapian - $skel->barrier_init(scalar(@$parts)); - # each partition needs to issue a barrier command to skel: + # Now deal with Xapian and overview DB + $over->barrier_init(scalar(@$parts)); + + # each partition needs to issue a barrier command to over $_->remote_barrier foreach @$parts; - $skel->barrier_wait; # wait for each Xapian partition + $over->barrier_wait; # wait for each Xapian partition $dbh->begin_work; } @@ -343,26 +348,30 @@ sub barrier { sub searchidx_checkpoint { my ($self, $more) = @_; - # order matters, we can only close {skel} after all partitions - # are done because the partitions also write to {skel} + # order matters, we can only close {over} after all partitions + # are done because the partitions also write to {over} if (my $parts = $self->{idx_parts}) { foreach my $idx (@$parts) { - $idx->remote_commit; # propagates commit to skel + $idx->remote_commit; # propagates commit to over $idx->remote_close unless $more; } delete $self->{idx_parts} unless $more; } - if (my $skel = $self->{skel}) { - my $dbh = $skel->{mm}->{dbh}; + if (my $mm = $self->{mm}) { + my $dbh = $mm->{dbh}; $dbh->commit; if ($more) { $dbh->begin_work; } else { - $skel->remote_close; - delete $self->{skel}; + delete $self->{mm}; } } + my $over = $self->{over}; + $over->remote_commit; + if (!$more) { + $over->remote_close; + } $self->{transact_bytes} = 0; } @@ -522,6 +531,7 @@ sub atfork_child { if (my $im = $self->{im}) { $im->atfork_child; } + die "unexpected mm" if $self->{mm}; } sub mark_deleted { @@ -559,7 +569,7 @@ sub reindex_oid { if (!defined($mid0) && $regen && !$del) { $num = $$regen--; die "BUG: ran out of article numbers\n" if $num <= 0; - my $mm = $self->{skel}->{mm}; + my $mm = $self->{mm}; foreach my $mid (reverse @$mids) { if ($mm->mid_set($num, $mid) == 1) { $mid0 = $mid; @@ -620,7 +630,7 @@ sub reindex { my $head = $ibx->{ref_head} || 'refs/heads/master'; $self->idx_init; # acquire lock my $x40 = qr/[a-f0-9]{40}/; - my $mm_tmp = $self->{skel}->{mm}->tmp_clone; + my $mm_tmp = $self->{mm}->tmp_clone; if (!$regen) { my (undef, $max) = $mm_tmp->minmax; unless (defined $max) {