X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=8e3122abba155e6cd20b7bcd084b4b5665b0053d;hb=35ff6bb106909b1c1232666a9792156dfa398ea8;hp=269b028f5373a9fb1e99ed7c2b8166c438c6d111;hpb=126e7c54da66ff58e20e76879decc91a075eeba1;p=public-inbox.git diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 269b028f..8e3122ab 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -7,13 +7,15 @@ use strict; use warnings; use base qw(PublicInbox::Lock); use PublicInbox::SearchIdxPart; -use PublicInbox::SearchIdxSkeleton; use PublicInbox::MIME; use PublicInbox::Git; use PublicInbox::Import; use PublicInbox::MID qw(mids); use PublicInbox::ContentId qw(content_id content_digest); use PublicInbox::Inbox; +use PublicInbox::OverIdxFork; +use PublicInbox::Msgmap; +use IO::Handle; # an estimate of the post-packed size to the raw uncompressed size my $PACKING_FACTOR = 0.4; @@ -50,12 +52,14 @@ sub new { } $nparts = nproc() if ($nparts == 0); + $v2ibx = PublicInbox::InboxWritable->new($v2ibx); my $self = { -inbox => $v2ibx, im => undef, # PublicInbox::Import partitions => $nparts, parallel => 1, transact_bytes => 0, + over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"), lock_path => "$dir/inbox.lock", # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), @@ -116,7 +120,7 @@ sub num_for { my $mids = mids($mime->header_obj); if (@$mids) { my $mid = $mids->[0]; - my $num = $self->{skel}->{mm}->mid_insert($mid); + my $num = $self->{mm}->mid_insert($mid); if (defined $num) { # common case $$mid0 = $mid; return $num; @@ -137,9 +141,9 @@ sub num_for { warn "<$mid> reused for mismatched content\n"; # try the rest of the mids - foreach my $i (1..$#$mids) { + for(my $i = $#$mids; $i >= 1; $i--) { my $m = $mids->[$i]; - $num = $self->{skel}->{mm}->mid_insert($m); + $num = $self->{mm}->mid_insert($m); if (defined $num) { warn "alternative <$m> for <$mid> found\n"; $$mid0 = $m; @@ -157,20 +161,20 @@ sub num_for_harder { my $hdr = $mime->header_obj; my $dig = content_digest($mime); $$mid0 = PublicInbox::Import::digest2mid($dig); - my $num = $self->{skel}->{mm}->mid_insert($$mid0); + my $num = $self->{mm}->mid_insert($$mid0); unless (defined $num) { # it's hard to spoof the last Received: header my @recvd = $hdr->header_raw('Received'); $dig->add("Received: $_") foreach (@recvd); $$mid0 = PublicInbox::Import::digest2mid($dig); - $num = $self->{skel}->{mm}->mid_insert($$mid0); + $num = $self->{mm}->mid_insert($$mid0); # fall back to a random Message-ID and give up determinism: until (defined($num)) { $dig->add(rand); $$mid0 = PublicInbox::Import::digest2mid($dig); warn "using random Message-ID <$$mid0> as fallback\n"; - $num = $self->{skel}->{mm}->mid_insert($$mid0); + $num = $self->{mm}->mid_insert($$mid0); } } PublicInbox::Import::append_mid($hdr, $$mid0); @@ -193,20 +197,27 @@ sub idx_init { # frequently activated. delete $ibx->{$_} foreach (qw(git mm search)); - $self->lock_acquire; + my $over = $self->{over}; + $ibx->umask_prepare; + $ibx->with_umask(sub { + $self->lock_acquire; + $over->create($self); - # first time initialization, first we create the skeleton pipe: - my $skel = $self->{skel} = PublicInbox::SearchIdxSkeleton->new($self); + # need to create all parts before initializing msgmap FD + my $max = $self->{partitions} - 1; - # need to create all parts before initializing msgmap FD - my $max = $self->{partitions} - 1; - my $idx = $self->{idx_parts} = []; - for my $i (0..$max) { - push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $skel); - } + # idx_parts must be visible to all forked processes + my $idx = $self->{idx_parts} = []; + for my $i (0..$max) { + push @$idx, PublicInbox::SearchIdxPart->new($self, $i); + } - # Now that all subprocesses are up, we can open the FD for SQLite: - $skel->_msgmap_init->{dbh}->begin_work; + # Now that all subprocesses are up, we can open the FDs + # for SQLite: + my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( + "$self->{-inbox}->{mainrepo}/msgmap.sqlite3", 1); + $mm->{dbh}->begin_work; + }); } sub purge_oids { @@ -228,9 +239,8 @@ sub remove_internal { my $ibx = $self->{-inbox}; my $srch = $ibx->search; my $cid = content_id($mime); - my $skel = $self->{skel}; my $parts = $self->{idx_parts}; - my $mm = $skel->{mm}; + my $mm = $self->{mm}; my $removed; my $mids = mids($mime->header_obj); @@ -265,9 +275,10 @@ sub remove_internal { $orig = undef; $removed->num; # memoize this for callers - foreach my $idx (@$parts, $skel) { + foreach my $idx (@$parts) { $idx->remote_remove($oid, $mid); } + $self->{over}->remove_oid($oid, $mid); } 1; # continue }); @@ -314,18 +325,20 @@ sub barrier { if (my $im = $self->{im}) { $im->barrier; } - my $skel = $self->{skel}; my $parts = $self->{idx_parts}; - if ($parts && $skel) { - my $dbh = $skel->{mm}->{dbh}; - $dbh->commit; # SQLite data is second in importance + if ($parts) { + my $dbh = $self->{mm}->{dbh}; + $dbh->commit; # SQLite msgmap data is second in importance + + my $over = $self->{over}; - # Now deal with Xapian - $skel->barrier_init(scalar(@$parts)); - # each partition needs to issue a barrier command to skel: + # Now deal with Xapian and overview DB + $over->barrier_init(scalar(@$parts)); + + # each partition needs to issue a barrier command to over $_->remote_barrier foreach @$parts; - $skel->barrier_wait; # wait for each Xapian partition + $over->barrier_wait; # wait for each Xapian partition $dbh->begin_work; } @@ -335,26 +348,30 @@ sub barrier { sub searchidx_checkpoint { my ($self, $more) = @_; - # order matters, we can only close {skel} after all partitions - # are done because the partitions also write to {skel} + # order matters, we can only close {over} after all partitions + # are done because the partitions also write to {over} if (my $parts = $self->{idx_parts}) { foreach my $idx (@$parts) { - $idx->remote_commit; # propagates commit to skel + $idx->remote_commit; # propagates commit to over $idx->remote_close unless $more; } delete $self->{idx_parts} unless $more; } - if (my $skel = $self->{skel}) { - my $dbh = $skel->{mm}->{dbh}; + if (my $mm = $self->{mm}) { + my $dbh = $mm->{dbh}; $dbh->commit; if ($more) { $dbh->begin_work; } else { - $skel->remote_close; - delete $self->{skel}; + delete $self->{mm}; } } + my $over = $self->{over}; + $over->remote_commit; + if (!$more) { + $over->remote_close; + } $self->{transact_bytes} = 0; } @@ -514,6 +531,7 @@ sub atfork_child { if (my $im = $self->{im}) { $im->atfork_child; } + die "unexpected mm" if $self->{mm}; } sub mark_deleted { @@ -551,7 +569,7 @@ sub reindex_oid { if (!defined($mid0) && $regen && !$del) { $num = $$regen--; die "BUG: ran out of article numbers\n" if $num <= 0; - my $mm = $self->{skel}->{mm}; + my $mm = $self->{mm}; foreach my $mid (reverse @$mids) { if ($mm->mid_set($num, $mid) == 1) { $mid0 = $mid; @@ -612,7 +630,7 @@ sub reindex { my $head = $ibx->{ref_head} || 'refs/heads/master'; $self->idx_init; # acquire lock my $x40 = qr/[a-f0-9]{40}/; - my $mm_tmp = $self->{skel}->{mm}->tmp_clone; + my $mm_tmp = $self->{mm}->tmp_clone; if (!$regen) { my (undef, $max) = $mm_tmp->minmax; unless (defined $max) { @@ -630,15 +648,19 @@ sub reindex { -d $git->{git_dir} or next; # missing parts are fine chomp($tip = $git->qx('rev-parse', $head)) unless $tip; my $h = $cur == $max_git ? $tip : $head; - my @count = ('rev-list', '--count', $h, '--', 'm'); - $regen_max += $git->qx(@count); + + # can't use 'rev-list --count' if we use --diff-filter + my $fh = $git->popen(qw(log --pretty=tformat:%h + --no-notes --no-color --no-renames + --diff-filter=AM), $h, '--', 'm'); + ++$regen_max while <$fh>; } die "No messages found in $pfx/*.git, bug?\n" unless $regen_max; $regen = \$regen_max; } my $D = {}; my @cmd = qw(log --raw -r --pretty=tformat:%h - --no-notes --no-color --no-abbrev); + --no-notes --no-color --no-abbrev --no-renames); # if we are regenerating, we must not use a newer tip commit than what # the regeneration counter used: @@ -659,7 +681,7 @@ sub reindex { } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) { $self->reindex_oid($mm_tmp, $D, $git, $1, $regen); - } elsif (m!\A:\d{6} 100644 $x40 ($x40) [AM]\t_/D$!o) { + } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) { $self->mark_deleted($D, $git, $1); } }