1 # Copyright (C) 2018 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 package PublicInbox::OverIdxFork;
6 use base qw(PublicInbox::OverIdx PublicInbox::Lock);
7 use Storable qw(freeze thaw);
11 my ($self, $v2writable) = @_;
12 $self->SUPER::create();
13 $self->spawn_worker($v2writable) if $v2writable->{parallel};
17 my ($self, $v2writable) = @_;
19 pipe($r, $w) or die "pipe failed: $!\n";
20 my ($barrier_wait, $barrier_note);
21 pipe($barrier_wait, $barrier_note) or die "pipe failed: $!\n";
22 binmode $_, ':raw' foreach ($r, $w, $barrier_wait, $barrier_note);
24 defined $pid or die "fork failed: $!\n";
26 $v2writable->atfork_child;
31 # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
32 # speeds V2Writable batch imports across 8 cores by nearly 20%
33 fcntl($r, 1031, 1048576) if $^O eq 'linux';
35 eval { over_worker_loop($self, $r, $barrier_note) };
36 die "over worker died: $@\n" if $@;
41 $self->{lock_path} = "$self->{filename}.pipe.lock";
44 $self->{barrier_wait} = $barrier_wait;
48 sub over_worker_loop {
49 my ($self, $r, $barrier_note) = @_;
50 $barrier_note->autoflush(1);
51 $0 = 'pi-v2-overview';
54 while (my $line = $r->getline) {
55 if ($line eq "commit\n") {
57 } elsif ($line eq "close\n") {
59 } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) {
61 die "barrier in-progress\n" if defined $barrier;
62 $barrier = { map { $_ => 1 } (0..$n) };
63 } elsif ($line =~ /\Abarrier (\d+)\n\z/) {
65 die "no barrier in-progress\n" unless defined $barrier;
66 delete $barrier->{$1} or die "unknown barrier: $part\n";
67 if ((scalar keys %$barrier) == 0) {
70 print $barrier_note "barrier_done\n" or die
71 "print failed to barrier note: $!";
73 } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) {
74 my ($oid, $mid) = ($1, $2);
75 $self->remove_oid($oid, $mid);
78 my $n = read($r, my $msg, $len) or die "read: $!\n";
79 $n == $len or die "short read: $n != $len\n";
80 $msg = thaw($msg); # should raise on error
81 defined $msg or die "failed to thaw buffer\n";
82 eval { add_over($self, $msg) };
83 warn "failed to index message <$msg->[-1]>: $@\n" if $@;
86 die "$$ $0 dbh not released\n" if $self->{dbh};
87 die "$$ $0 still in transaction\n" if $self->{txn};
90 # called by a partition worker
91 # values: [ DS, NUM, BYTES, LINES, TS, MIDS, XPATH, doc_data ]
93 my ($self, $values) = @_;
94 if (my $w = $self->{w}) {
96 my $str = freeze($values);
97 $str = length($str) . "\n" . $str;
99 # multiple processes write to the same pipe, so use flock
100 # We can't avoid this lock for <=PIPE_BUF writes, either,
101 # because those atomic writes can break up >PIPE_BUF ones
103 print $w $str or $err = $!;
106 die "print failed: $err\n" if $err;
108 $self->SUPER::add_over($values);
113 my ($self, $oid, $mid) = @_;
114 if (my $w = $self->{w}) {
117 print $w "D $oid $mid\n" or $err = $!;
121 $self->SUPER::remove_oid($oid, $mid); # OverIdx
125 # write to the subprocess
127 my ($self, $nparts) = @_;
128 my $w = $self->{w} or return;
131 print $w "barrier_init $nparts\n" or $err = $!;
138 if (my $bw = $self->{barrier_wait}) {
139 my $l = $bw->getline;
140 $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n";
148 if (my $w = $self->{w}) {
151 print $w "commit\n" or $err = $!;
159 # prevent connections when using forked subprocesses
162 return if $self->{w};
163 $self->SUPER::connect;
168 if (my $w = delete $self->{w}) {
169 my $pid = delete $self->{pid} or die "no process to wait on\n";
170 print $w "close\n" or die "failed to write to pid:$pid: $!\n";
171 close $w or die "failed to close pipe for pid:$pid: $!\n";
172 waitpid($pid, 0) == $pid or die "remote process did not finish";
173 $? == 0 or die ref($self)." pid:$pid exited with: $?";
175 die "transaction in progress $self\n" if $self->{txn};