X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=8361d09fab54d2c598d12aa5eec203e4ef0cb42f;hp=877a45912fcea3fab814fd9814b3996526ceb0d9;hb=06cd576bb9b73eb177728fead06de8c3fac8d7a3;hpb=59b5b84b8843d54024b5bb182f02b9b487a1ed29 diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 877a4591..8361d09f 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -13,7 +13,7 @@ use PublicInbox::Import; use PublicInbox::MID qw(mids); use PublicInbox::ContentId qw(content_id content_digest); use PublicInbox::Inbox; -use PublicInbox::OverIdxFork; +use PublicInbox::OverIdx; use PublicInbox::Msgmap; use PublicInbox::Spawn; use IO::Handle; @@ -67,7 +67,7 @@ sub new { parallel => 1, transact_bytes => 0, xpfx => $xpfx, - over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"), + over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3", 1), lock_path => "$dir/inbox.lock", # limit each git repo (epoch) to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), @@ -111,11 +111,12 @@ sub add { my $im = $self->importer; my $cmt = $im->add($mime); $cmt = $im->get_mark($cmt); - my ($oid, $len, $msgref) = @{$im->{last_object}}; + $self->{last_commit}->[$self->{epoch_max}] = $cmt; + my ($oid, $len, $msgref) = @{$im->{last_object}}; + $self->{over}->add_overview($mime, $len, $num, $oid, $mid0); my $nparts = $self->{partitions}; my $part = $num % $nparts; - $self->{last_commit}->[$self->{epoch_max}] = $cmt; my $idx = $self->idx_part($part); $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime); my $n = $self->{transact_bytes} += $len; @@ -208,11 +209,17 @@ sub idx_init { # frequently activated. delete $ibx->{$_} foreach (qw(git mm search)); + if ($self->{parallel}) { + pipe(my ($r, $w)) or die "pipe failed: $!"; + $self->{bnote} = [ $r, $w ]; + $w->autoflush(1); + } + my $over = $self->{over}; $ibx->umask_prepare; $ibx->with_umask(sub { $self->lock_acquire; - $over->create($self); + $over->create; # -compact can change partition count while -watch is idle my $nparts = count_partitions($self); @@ -256,7 +263,7 @@ sub remove_internal { $self->idx_init; my $im = $self->importer unless $purge; my $ibx = $self->{-inbox}; - my $srch = $ibx->search; + my $over = $self->{over}; my $cid = content_id($mime); my $parts = $self->{idx_parts}; my $mm = $self->{mm}; @@ -272,7 +279,7 @@ sub remove_internal { foreach my $mid (@$mids) { my %gone; my ($id, $prev); - while (my $smsg = $srch->next_by_mid($mid, \$id, \$prev)) { + while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { my $msg = $ibx->msg_by_smsg($smsg); if (!defined($msg)) { warn "broken smsg for $mid\n"; @@ -304,9 +311,7 @@ sub remove_internal { ($mark, undef) = $im->remove($orig, $cmt_msg); } $orig = undef; - foreach my $idx (@$parts) { - $idx->remote_remove($oid, $mid); - } + $self->unindex_oid_remote($oid, $mid); } $self->barrier; } @@ -371,8 +376,8 @@ sub done { } my $over = $self->{over}; - $over->remote_commit; - $over->remote_close; + $over->commit_lazy; + $over->disconnect; if ($mm) { $mm->{dbh}->begin_work; @@ -381,6 +386,7 @@ sub done { delete $self->{mm}; } + delete $self->{bnote}; $self->{transact_bytes} = 0; $self->lock_release if $parts; } @@ -389,7 +395,25 @@ sub checkpoint { my ($self) = @_; my $im = $self->{im}; $im->checkpoint if $im; # PublicInbox::Import::checkpoint - $self->barrier(1); + $self->barrier; +} + +sub barrier_init { + my ($self, $n) = @_; + $self->{bnote} or return; + --$n; + my $barrier = { map { $_ => 1 } (0..$n) }; +} + +sub barrier_wait { + my ($self, $barrier) = @_; + my $bnote = $self->{bnote} or return; + my $r = $bnote->[0]; + while (scalar keys %$barrier) { + defined(my $l = $r->getline) or die "EOF on barrier_wait: $!"; + $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l"; + delete $barrier->{$1} or die "bad part[$1] on barrier wait"; + } } # issue a write barrier to ensure all data is visible to other processes @@ -403,17 +427,19 @@ sub barrier { my $parts = $self->{idx_parts}; if ($parts) { my $dbh = $self->{mm}->{dbh}; - $dbh->commit; # SQLite msgmap data is second in importance - my $over = $self->{over}; + # SQLite msgmap data is second in importance + $dbh->commit; - # Now deal with Xapian and overview DB - $over->barrier_init(scalar(@$parts)); + # SQLite overview is third + $self->{over}->commit_lazy; - # each partition needs to issue a barrier command to over - $_->remote_barrier foreach @$parts; + # Now deal with Xapian + my $barrier = $self->barrier_init(scalar @$parts); - $over->barrier_wait; # wait for each Xapian partition + # each partition needs to issue a barrier command + $_->remote_barrier for @$parts; + $self->barrier_wait($barrier); # wait for each Xapian partition # last_commit is special, don't commit these until # remote partitions are done: @@ -486,7 +512,7 @@ sub importer { } else { $self->{im} = undef; $im->done; - $self->barrier(1); + $self->barrier; $im = undef; my $git_dir = $self->git_init(++$self->{epoch_max}); my $git = PublicInbox::Git->new($git_dir); @@ -546,12 +572,11 @@ sub diff ($$$) { sub lookup_content { my ($self, $mime, $mid) = @_; my $ibx = $self->{-inbox}; - - my $srch = $ibx->search->reopen; + my $over = $self->{over}; my $cid = content_id($mime); my $found; my ($id, $prev); - while (my $smsg = $srch->next_by_mid($mid, \$id, \$prev)) { + while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { my $msg = $ibx->msg_by_smsg($smsg); if (!defined($msg)) { warn "broken smsg for $mid\n"; @@ -581,6 +606,8 @@ sub atfork_child { $im->atfork_child; } die "unexpected mm" if $self->{mm}; + close $self->{bnote}->[0] or die "close bnote[0]: $!\n"; + $self->{bnote}->[1]; } sub mark_deleted { @@ -654,6 +681,7 @@ sub reindex_oid { $mm_tmp->mid_delete($mid0) or die "failed to delete <$mid0> for article #$num\n"; + $self->{over}->add_overview($mime, $len, $num, $oid, $mid0); my $nparts = $self->{partitions}; my $part = $num % $nparts; my $idx = $self->idx_part($part); @@ -759,17 +787,23 @@ $range \$regen_max; } +sub unindex_oid_remote { + my ($self, $oid, $mid) = @_; + $_->remote_remove($oid, $mid) foreach @{$self->{idx_parts}}; + $self->{over}->remove_oid($oid, $mid); +} + sub unindex_oid { my ($self, $git, $oid) = @_; my $msgref = $git->cat_file($oid); my $mime = PublicInbox::MIME->new($msgref); my $mids = mids($mime->header_obj); $mime = $msgref = undef; - my $srch = $self->{-inbox}->search; + my $over = $self->{over}; foreach my $mid (@$mids) { my %gone; my ($id, $prev); - while (my $smsg = $srch->next_by_mid($mid, \$id, \$prev)) { + while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { $gone{$smsg->num} = 1 if $oid eq $smsg->{blob}; 1; # continue } @@ -780,8 +814,7 @@ sub unindex_oid { join(',',sort keys %gone), "\n"; } $self->{unindexed}->{$_}++ foreach keys %gone; - $_->remote_remove($oid, $mid) foreach @{$self->{idx_parts}}; - $self->{over}->remove_oid($oid, $mid); + $self->unindex_oid_remote($oid, $mid); $self->barrier; } }