X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=479e2b5daafbd25c0868a0a762fea6edee65a794;hp=b516278ed1e56eef418234f3ab0858c72d155b19;hb=87425172fb480214c3f72e0174f4f8f15f48d92d;hpb=d2f3227c622174b96db8c2dc69dbc1fa3df50fcd diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index b516278e..479e2b5d 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -7,13 +7,15 @@ use strict; use warnings; use base qw(PublicInbox::Lock); use PublicInbox::SearchIdxPart; -use PublicInbox::SearchIdxSkeleton; use PublicInbox::MIME; use PublicInbox::Git; use PublicInbox::Import; use PublicInbox::MID qw(mids); use PublicInbox::ContentId qw(content_id content_digest); use PublicInbox::Inbox; +use PublicInbox::OverIdxFork; +use PublicInbox::Msgmap; +use IO::Handle; # an estimate of the post-packed size to the raw uncompressed size my $PACKING_FACTOR = 0.4; @@ -50,12 +52,14 @@ sub new { } $nparts = nproc() if ($nparts == 0); + $v2ibx = PublicInbox::InboxWritable->new($v2ibx); my $self = { -inbox => $v2ibx, im => undef, # PublicInbox::Import partitions => $nparts, parallel => 1, transact_bytes => 0, + over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"), lock_path => "$dir/inbox.lock", # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), @@ -67,7 +71,9 @@ sub init_inbox { my ($self, $parallel) = @_; $self->{parallel} = $parallel; $self->idx_init; - $self->git_init(0); + my $max_git = -1; + git_dir_latest($self, \$max_git); + $self->git_init($max_git >= 0 ? $max_git : 0); $self->done; } @@ -114,7 +120,7 @@ sub num_for { my $mids = mids($mime->header_obj); if (@$mids) { my $mid = $mids->[0]; - my $num = $self->{skel}->{mm}->mid_insert($mid); + my $num = $self->{mm}->mid_insert($mid); if (defined $num) { # common case $$mid0 = $mid; return $num; @@ -135,9 +141,9 @@ sub num_for { warn "<$mid> reused for mismatched content\n"; # try the rest of the mids - foreach my $i (1..$#$mids) { + for(my $i = $#$mids; $i >= 1; $i--) { my $m = $mids->[$i]; - $num = $self->{skel}->{mm}->mid_insert($m); + $num = $self->{mm}->mid_insert($m); if (defined $num) { warn "alternative <$m> for <$mid> found\n"; $$mid0 = $m; @@ -155,20 +161,20 @@ sub num_for_harder { my $hdr = $mime->header_obj; my $dig = content_digest($mime); $$mid0 = PublicInbox::Import::digest2mid($dig); - my $num = $self->{skel}->{mm}->mid_insert($$mid0); + my $num = $self->{mm}->mid_insert($$mid0); unless (defined $num) { # it's hard to spoof the last Received: header my @recvd = $hdr->header_raw('Received'); $dig->add("Received: $_") foreach (@recvd); $$mid0 = PublicInbox::Import::digest2mid($dig); - $num = $self->{skel}->{mm}->mid_insert($$mid0); + $num = $self->{mm}->mid_insert($$mid0); # fall back to a random Message-ID and give up determinism: until (defined($num)) { $dig->add(rand); $$mid0 = PublicInbox::Import::digest2mid($dig); warn "using random Message-ID <$$mid0> as fallback\n"; - $num = $self->{skel}->{mm}->mid_insert($$mid0); + $num = $self->{mm}->mid_insert($$mid0); } } PublicInbox::Import::append_mid($hdr, $$mid0); @@ -191,20 +197,27 @@ sub idx_init { # frequently activated. delete $ibx->{$_} foreach (qw(git mm search)); - $self->lock_acquire; + my $over = $self->{over}; + $ibx->umask_prepare; + $ibx->with_umask(sub { + $self->lock_acquire; + $over->create($self); - # first time initialization, first we create the skeleton pipe: - my $skel = $self->{skel} = PublicInbox::SearchIdxSkeleton->new($self); + # need to create all parts before initializing msgmap FD + my $max = $self->{partitions} - 1; - # need to create all parts before initializing msgmap FD - my $max = $self->{partitions} - 1; - my $idx = $self->{idx_parts} = []; - for my $i (0..$max) { - push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $skel); - } + # idx_parts must be visible to all forked processes + my $idx = $self->{idx_parts} = []; + for my $i (0..$max) { + push @$idx, PublicInbox::SearchIdxPart->new($self, $i); + } - # Now that all subprocesses are up, we can open the FD for SQLite: - $skel->_msgmap_init->{dbh}->begin_work; + # Now that all subprocesses are up, we can open the FDs + # for SQLite: + my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( + "$self->{-inbox}->{mainrepo}/msgmap.sqlite3", 1); + $mm->{dbh}->begin_work; + }); } sub purge_oids { @@ -226,9 +239,8 @@ sub remove_internal { my $ibx = $self->{-inbox}; my $srch = $ibx->search; my $cid = content_id($mime); - my $skel = $self->{skel}; my $parts = $self->{idx_parts}; - my $mm = $skel->{mm}; + my $mm = $self->{mm}; my $removed; my $mids = mids($mime->header_obj); @@ -263,9 +275,10 @@ sub remove_internal { $orig = undef; $removed->num; # memoize this for callers - foreach my $idx (@$parts, $skel) { + foreach my $idx (@$parts) { $idx->remote_remove($oid, $mid); } + $self->{over}->remove_oid($oid, $mid); } 1; # continue }); @@ -290,68 +303,60 @@ sub purge { sub done { my ($self) = @_; - my $locked = defined $self->{idx_parts}; my $im = delete $self->{im}; $im->done if $im; # PublicInbox::Import::done - $self->searchidx_checkpoint(0); - $self->lock_release if $locked; + + if (my $mm = delete $self->{mm}) { + $mm->{dbh}->commit; + } + + # order matters, we can only close {over} after all partitions + # are done because the partitions also write to {over} + my $parts = delete $self->{idx_parts}; + if ($parts) { + $_->remote_commit for @$parts; + $_->remote_close for @$parts; + } + + my $over = $self->{over}; + $over->remote_commit; + $over->remote_close; + $self->{transact_bytes} = 0; + $self->lock_release if $parts; } sub checkpoint { my ($self) = @_; my $im = $self->{im}; $im->checkpoint if $im; # PublicInbox::Import::checkpoint - $self->searchidx_checkpoint(1); + $self->barrier(1); } # issue a write barrier to ensure all data is visible to other processes # and read-only ops. Order of data importance is: git > SQLite > Xapian sub barrier { - my ($self) = @_; + my ($self, $fsync) = @_; if (my $im = $self->{im}) { $im->barrier; } - my $skel = $self->{skel}; my $parts = $self->{idx_parts}; - if ($parts && $skel) { - my $dbh = $skel->{mm}->{dbh}; - $dbh->commit; # SQLite data is second in importance + if ($parts) { + my $dbh = $self->{mm}->{dbh}; + $dbh->commit; # SQLite msgmap data is second in importance - # Now deal with Xapian - $skel->barrier_init(scalar(@$parts)); - # each partition needs to issue a barrier command to skel: - $_->remote_barrier foreach @$parts; + my $over = $self->{over}; - $skel->barrier_wait; # wait for each Xapian partition + # Now deal with Xapian and overview DB + $over->barrier_init(scalar(@$parts)); - $dbh->begin_work; - } - $self->{transact_bytes} = 0; -} + # each partition needs to issue a barrier command to over + $_->remote_barrier foreach @$parts; -sub searchidx_checkpoint { - my ($self, $more) = @_; + $over->barrier_wait; # wait for each Xapian partition + $over->commit_fsync if $fsync; - # order matters, we can only close {skel} after all partitions - # are done because the partitions also write to {skel} - if (my $parts = $self->{idx_parts}) { - foreach my $idx (@$parts) { - $idx->remote_commit; # propagates commit to skel - $idx->remote_close unless $more; - } - delete $self->{idx_parts} unless $more; - } - - if (my $skel = $self->{skel}) { - my $dbh = $skel->{mm}->{dbh}; - $dbh->commit; - if ($more) { - $dbh->begin_work; - } else { - $skel->remote_close; - delete $self->{skel}; - } + $dbh->begin_work; } $self->{transact_bytes} = 0; } @@ -360,7 +365,6 @@ sub git_init { my ($self, $new) = @_; my $pfx = "$self->{-inbox}->{mainrepo}/git"; my $git_dir = "$pfx/$new.git"; - die "$git_dir exists\n" if -e $git_dir; my @cmd = (qw(git init --bare -q), $git_dir); PublicInbox::Import::run_die(\@cmd); @@ -417,7 +421,7 @@ sub importer { } else { $self->{im} = undef; $im->done; - $self->searchidx_checkpoint(1); + $self->barrier(1); $im = undef; my $git_dir = $self->git_init(++$self->{max_git}); my $git = PublicInbox::Git->new($git_dir); @@ -513,6 +517,7 @@ sub atfork_child { if (my $im = $self->{im}) { $im->atfork_child; } + die "unexpected mm" if $self->{mm}; } sub mark_deleted { @@ -550,8 +555,8 @@ sub reindex_oid { if (!defined($mid0) && $regen && !$del) { $num = $$regen--; die "BUG: ran out of article numbers\n" if $num <= 0; - my $mm = $self->{skel}->{mm}; - foreach my $mid (@$mids) { + my $mm = $self->{mm}; + foreach my $mid (reverse @$mids) { if ($mm->mid_set($num, $mid) == 1) { $mid0 = $mid; last; @@ -559,7 +564,11 @@ sub reindex_oid { } if (!defined($mid0)) { my $id = '<' . join('> <', @$mids) . '>'; - warn "Message-Id $id unusable for $num\n"; + warn "Message-ID $id unusable for $num\n"; + foreach my $mid (@$mids) { + defined(my $n = $mm->num_for($mid)) or next; + warn "#$n previously mapped for <$mid>\n"; + } } } @@ -607,7 +616,7 @@ sub reindex { my $head = $ibx->{ref_head} || 'refs/heads/master'; $self->idx_init; # acquire lock my $x40 = qr/[a-f0-9]{40}/; - my $mm_tmp = $self->{skel}->{mm}->tmp_clone; + my $mm_tmp = $self->{mm}->tmp_clone; if (!$regen) { my (undef, $max) = $mm_tmp->minmax; unless (defined $max) { @@ -622,17 +631,22 @@ sub reindex { for (my $cur = $max_git; $cur >= 0; $cur--) { die "already reindexing!\n" if $self->{reindex_pipe}; my $git = PublicInbox::Git->new("$pfx/$cur.git"); + -d $git->{git_dir} or next; # missing parts are fine chomp($tip = $git->qx('rev-parse', $head)) unless $tip; my $h = $cur == $max_git ? $tip : $head; - my @count = ('rev-list', '--count', $h, '--', 'm'); - $regen_max += $git->qx(@count); + + # can't use 'rev-list --count' if we use --diff-filter + my $fh = $git->popen(qw(log --pretty=tformat:%h + --no-notes --no-color --no-renames + --diff-filter=AM), $h, '--', 'm'); + ++$regen_max while <$fh>; } die "No messages found in $pfx/*.git, bug?\n" unless $regen_max; $regen = \$regen_max; } my $D = {}; my @cmd = qw(log --raw -r --pretty=tformat:%h - --no-notes --no-color --no-abbrev); + --no-notes --no-color --no-abbrev --no-renames); # if we are regenerating, we must not use a newer tip commit than what # the regeneration counter used: @@ -643,6 +657,7 @@ sub reindex { die "already reindexing!\n" if delete $self->{reindex_pipe}; my $cmt; my $git_dir = "$pfx/$cur.git"; + -d $git_dir or next; # missing parts are fine my $git = PublicInbox::Git->new($git_dir); my $h = $cur == $max_git ? $tip : $head; my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $h); @@ -652,14 +667,23 @@ sub reindex { } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) { $self->reindex_oid($mm_tmp, $D, $git, $1, $regen); - } elsif (m!\A:\d{6} 100644 $x40 ($x40) [AM]\t_/D$!o) { + } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) { $self->mark_deleted($D, $git, $1); } } delete $self->{reindex_pipe}; } + my $gaps; + if ($regen && $$regen != 0) { + warn "W: leftover article number ($$regen)\n"; + $gaps = 1; + } my ($min, $max) = $mm_tmp->minmax; - defined $max and die "leftover article numbers at $min..$max\n"; + if (defined $max) { + warn "W: leftover article numbers at $min..$max\n"; + $gaps = 1; + } + warn "W: were old git partitions deleted?\n" if $gaps; my @d = sort keys %$D; if (@d) { warn "BUG: ", scalar(@d)," unseen deleted messages marked\n";