X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FSearchIdxSkeleton.pm;h=2be6496003e1aaa330b62f7930f82c3e4cb5833c;hb=7503aeb540af5afd5cb1b554b3c29f35f5fc918d;hp=40b28c519bfe8b2c92ea0db99b61b9f98cc49e01;hpb=ae68bf5da734189549bbac3a525845a58e45d77f;p=public-inbox.git diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm index 40b28c51..2be64960 100644 --- a/lib/PublicInbox/SearchIdxSkeleton.pm +++ b/lib/PublicInbox/SearchIdxSkeleton.pm @@ -12,99 +12,144 @@ sub new { # create the DB: $self->_xdb_acquire; $self->_xdb_release; + $self->spawn_worker($v2writable) if $v2writable->{parallel}; + $self +} +sub spawn_worker { + my ($self, $v2writable) = @_; my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; - binmode $r, ':raw'; - binmode $w, ':raw'; + my ($barrier_wait, $barrier_note); + pipe($barrier_wait, $barrier_note) or die "pipe failed: $!\n"; + binmode $_, ':raw' foreach ($r, $w, $barrier_wait, $barrier_note); my $pid = fork; defined $pid or die "fork failed: $!\n"; if ($pid == 0) { $v2writable->atfork_child; $v2writable = undef; close $w; - eval { skeleton_worker_loop($self, $r) }; + close $barrier_wait; + eval { skeleton_worker_loop($self, $r, $barrier_note) }; die "skeleton worker died: $@\n" if $@; exit; } $self->{w} = $w; $self->{pid} = $pid; close $r; + close $barrier_note; + $self->{barrier_wait} = $barrier_wait; $w->autoflush(1); # lock on only exists in parent, not in worker - my $l = $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock'; - open my $fh, '>>', $l or die "failed to create $l: $!\n"; - $self; + $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock'; } sub skeleton_worker_loop { - my ($self, $r) = @_; + my ($self, $r, $barrier_note) = @_; + $barrier_note->autoflush(1); $0 = 'pi-v2-skeleton'; - my $xdb = $self->_xdb_acquire; - $xdb->begin_transaction; - my $txn = 1; + $self->begin_txn_lazy; + my $barrier = undef; while (my $line = $r->getline) { if ($line eq "commit\n") { - $xdb->commit_transaction if $txn; - $txn = undef; + $self->commit_txn_lazy; } elsif ($line eq "close\n") { $self->_xdb_release; - $xdb = $txn = undef; + } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) { + my $n = $1 - 1; + die "barrier in-progress\n" if defined $barrier; + $barrier = { map { $_ => 1 } (0..$n) }; + } elsif ($line =~ /\Abarrier (\d+)\n\z/) { + my $part = $1; + die "no barrier in-progress\n" unless defined $barrier; + delete $barrier->{$1} or die "unknown barrier: $part\n"; + if ((scalar keys %$barrier) == 0) { + $barrier = undef; + $self->commit_txn_lazy; + print $barrier_note "barrier_done\n" or die + "print failed to barrier note: $!"; + } + } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) { + my ($oid, $mid) = ($1, $2); + $self->begin_txn_lazy; + $self->remove_by_oid($oid, $mid); } else { my $len = int($line); my $n = read($r, my $msg, $len) or die "read: $!\n"; $n == $len or die "short read: $n != $len\n"; $msg = thaw($msg); # should raise on error defined $msg or die "failed to thaw buffer\n"; - $xdb ||= $self->_xdb_acquire; - if (!$txn) { - $xdb->begin_transaction; - $txn = 1; - } + $self->begin_txn_lazy; eval { index_skeleton_real($self, $msg) }; warn "failed to index message <$msg->[-1]>: $@\n" if $@; } } - die "xdb not released\n" if $xdb; - die "in transaction\n" if $txn; + $self->worker_done; } # called by a partition worker sub index_skeleton { my ($self, $values) = @_; - my $w = $self->{w}; - my $err; - my $str = freeze($values); - $str = length($str) . "\n" . $str; + if (my $w = $self->{w}) { + my $err; + my $str = freeze($values); + $str = length($str) . "\n" . $str; - # multiple processes write to the same pipe, so use flock - # We can't avoid this lock for <=PIPE_BUF writes, either, - # because those atomic writes can break up >PIPE_BUF ones - $self->_lock_acquire; - print $w $str or $err = $!; - $self->_lock_release; + # multiple processes write to the same pipe, so use flock + # We can't avoid this lock for <=PIPE_BUF writes, either, + # because those atomic writes can break up >PIPE_BUF ones + $self->lock_acquire; + print $w $str or $err = $!; + $self->lock_release; - die "print failed: $err\n" if $err; + die "print failed: $err\n" if $err; + } else { + $self->begin_txn_lazy; + index_skeleton_real($self, $values); + } +} + +sub remote_remove { + my ($self, $oid, $mid) = @_; + my $err; + $self->lock_acquire; + eval { $self->SUPER::remote_remove($oid, $mid) }; + $err = $@; + $self->lock_release; + die $err if $err; } -# values: [ TS, NUM, BYTES, LINES, MID, XPATH, doc_data ] sub index_skeleton_real ($$) { my ($self, $values) = @_; - my $doc_data = pop @$values; - my $xpath = pop @$values; - my $mids = pop @$values; - my $ts = $values->[PublicInbox::Search::TS]; + my ($ts, $num, $mids, $xpath, $doc_data) = @$values; my $smsg = PublicInbox::SearchMsg->new(undef); + $smsg->load_from_data($doc_data); my $doc = $smsg->{doc}; - PublicInbox::SearchIdx::add_values($doc, $values); $doc->set_data($doc_data); - $smsg->{ts} = $ts; - $smsg->load_from_data($doc_data); - my $num = $values->[PublicInbox::Search::NUM]; + PublicInbox::SearchIdx::add_values($doc, $ts, $smsg->ds, $num); my @refs = ($smsg->references =~ /<([^>]+)>/g); + $self->delete_article($num) if defined $num; # for reindexing $self->link_and_save($doc, $mids, \@refs, $num, $xpath); } +# write to the subprocess +sub barrier_init { + my ($self, $nparts) = @_; + my $w = $self->{w} or return; + my $err; + $self->lock_acquire; + print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n"; + $self->lock_release; + die $err if $err; +} + +sub barrier_wait { + my ($self) = @_; + my $bw = $self->{barrier_wait} or return; + my $l = $bw->getline; + $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n"; +} + 1;