]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/OverIdxFork.pm
replace Xapian skeleton with SQLite overview DB
[public-inbox.git] / lib / PublicInbox / OverIdxFork.pm
similarity index 56%
rename from lib/PublicInbox/SearchIdxSkeleton.pm
rename to lib/PublicInbox/OverIdxFork.pm
index 2be6496003e1aaa330b62f7930f82c3e4cb5833c..f4f7cddd3dad99bf569b34b657acc8831b357aef 100644 (file)
@@ -1,19 +1,16 @@
 # Copyright (C) 2018 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-package PublicInbox::SearchIdxSkeleton;
+package PublicInbox::OverIdxFork;
 use strict;
 use warnings;
-use base qw(PublicInbox::SearchIdx);
+use base qw(PublicInbox::OverIdx PublicInbox::Lock);
 use Storable qw(freeze thaw);
+use IO::Handle;
 
-sub new {
-       my ($class, $v2writable) = @_;
-       my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'skel');
-       # create the DB:
-       $self->_xdb_acquire;
-       $self->_xdb_release;
+sub create {
+       my ($self, $v2writable) = @_;
+       $self->SUPER::create();
        $self->spawn_worker($v2writable) if $v2writable->{parallel};
-       $self
 }
 
 sub spawn_worker {
@@ -30,33 +27,35 @@ sub spawn_worker {
                $v2writable = undef;
                close $w;
                close $barrier_wait;
-               eval { skeleton_worker_loop($self, $r, $barrier_note) };
-               die "skeleton worker died: $@\n" if $@;
+
+               # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
+               # speeds V2Writable batch imports across 8 cores by nearly 20%
+               fcntl($r, 1031, 1048576) if $^O eq 'linux';
+
+               eval { over_worker_loop($self, $r, $barrier_note) };
+               die "over worker died: $@\n" if $@;
                exit;
        }
        $self->{w} = $w;
        $self->{pid} = $pid;
+       $self->{lock_path} = "$self->{filename}.pipe.lock";
        close $r;
        close $barrier_note;
        $self->{barrier_wait} = $barrier_wait;
-
        $w->autoflush(1);
-
-       # lock on only exists in parent, not in worker
-       $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
 }
 
-sub skeleton_worker_loop {
+sub over_worker_loop {
        my ($self, $r, $barrier_note) = @_;
        $barrier_note->autoflush(1);
-       $0 = 'pi-v2-skeleton';
-       $self->begin_txn_lazy;
+       $0 = 'pi-v2-overview';
+       $self->begin_lazy;
        my $barrier = undef;
        while (my $line = $r->getline) {
                if ($line eq "commit\n") {
-                       $self->commit_txn_lazy;
+                       $self->commit_lazy;
                } elsif ($line eq "close\n") {
-                       $self->_xdb_release;
+                       $self->disconnect;
                } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) {
                        my $n = $1 - 1;
                        die "barrier in-progress\n" if defined $barrier;
@@ -67,30 +66,30 @@ sub skeleton_worker_loop {
                        delete $barrier->{$1} or die "unknown barrier: $part\n";
                        if ((scalar keys %$barrier) == 0) {
                                $barrier = undef;
-                               $self->commit_txn_lazy;
+                               $self->commit_lazy;
                                print $barrier_note "barrier_done\n" or die
                                        "print failed to barrier note: $!";
                        }
                } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) {
                        my ($oid, $mid) = ($1, $2);
-                       $self->begin_txn_lazy;
-                       $self->remove_by_oid($oid, $mid);
+                       $self->remove_oid($oid, $mid);
                } else {
                        my $len = int($line);
                        my $n = read($r, my $msg, $len) or die "read: $!\n";
                        $n == $len or die "short read: $n != $len\n";
                        $msg = thaw($msg); # should raise on error
                        defined $msg or die "failed to thaw buffer\n";
-                       $self->begin_txn_lazy;
-                       eval { index_skeleton_real($self, $msg) };
+                       eval { add_over($self, $msg) };
                        warn "failed to index message <$msg->[-1]>: $@\n" if $@;
                }
        }
-       $self->worker_done;
+       die "$$ $0 dbh not released\n" if $self->{dbh};
+       die "$$ $0 still in transaction\n" if $self->{txn};
 }
 
 # called by a partition worker
-sub index_skeleton {
+# values: [ DS, NUM, BYTES, LINES, TS, MIDS, XPATH, doc_data ]
+sub add_over {
        my ($self, $values) = @_;
        if (my $w = $self->{w}) {
                my $err;
@@ -106,32 +105,21 @@ sub index_skeleton {
 
                die "print failed: $err\n" if $err;
        } else {
-               $self->begin_txn_lazy;
-               index_skeleton_real($self, $values);
+               $self->SUPER::add_over($values);
        }
 }
 
-sub remote_remove {
+sub remove_oid {
        my ($self, $oid, $mid) = @_;
-       my $err;
-       $self->lock_acquire;
-       eval { $self->SUPER::remote_remove($oid, $mid) };
-       $err = $@;
-       $self->lock_release;
-       die $err if $err;
-}
-
-sub index_skeleton_real ($$) {
-       my ($self, $values) = @_;
-       my ($ts, $num, $mids, $xpath, $doc_data) = @$values;
-       my $smsg = PublicInbox::SearchMsg->new(undef);
-       $smsg->load_from_data($doc_data);
-       my $doc = $smsg->{doc};
-       $doc->set_data($doc_data);
-       PublicInbox::SearchIdx::add_values($doc, $ts, $smsg->ds, $num);
-       my @refs = ($smsg->references =~ /<([^>]+)>/g);
-       $self->delete_article($num) if defined $num; # for reindexing
-       $self->link_and_save($doc, $mids, \@refs, $num, $xpath);
+       if (my $w = $self->{w}) {
+               my $err;
+               $self->lock_acquire;
+               print $w "D $oid $mid\n" or $err = $!;
+               $self->lock_release;
+               die $err if $err;
+       } else {
+               $self->SUPER::remove_oid($oid, $mid); # OverIdx
+       }
 }
 
 # write to the subprocess
@@ -140,7 +128,7 @@ sub barrier_init {
        my $w = $self->{w} or return;
        my $err;
        $self->lock_acquire;
-       print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n";
+       print $w "barrier_init $nparts\n" or $err = $!;
        $self->lock_release;
        die $err if $err;
 }
@@ -152,4 +140,44 @@ sub barrier_wait {
        $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n";
 }
 
+sub remote_commit {
+       my ($self) = @_;
+       if (my $w = $self->{w}) {
+               my $err;
+               $self->lock_acquire;
+               print $w "commit\n" or $err = $!;
+               $self->lock_release;
+               die $err if $err;
+       } else {
+               $self->commit_lazy;
+       }
+}
+
+# prevent connections when using forked subprocesses
+sub connect {
+       my ($self) = @_;
+       return if $self->{w};
+       $self->SUPER::connect;
+}
+
+sub remote_close {
+       my ($self) = @_;
+       if (my $w = delete $self->{w}) {
+               my $pid = delete $self->{pid} or die "no process to wait on\n";
+               print $w "close\n" or die "failed to write to pid:$pid: $!\n";
+               close $w or die "failed to close pipe for pid:$pid: $!\n";
+               waitpid($pid, 0) == $pid or die "remote process did not finish";
+               $? == 0 or die ref($self)." pid:$pid exited with: $?";
+       } else {
+               die "transaction in progress $self\n" if $self->{txn};
+               $self->disconnect;
+       }
+}
+
+sub commit_fsync {
+       my ($self) = @_;
+       return if $self->{w}; # don't bother; main parent can also call this
+       $self->SUPER::commit_fsync;
+}
+
 1;