]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/IPC.pm
ipc: avoid excessive evals
[public-inbox.git] / lib / PublicInbox / IPC.pm
1 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # base class for remote IPC calls, requires Storable
5 # TODO: this ought to be usable in SearchIdxShard
6 package PublicInbox::IPC;
7 use strict;
8 use v5.10.1;
9 use Carp qw(confess croak);
10 use PublicInbox::Sigfd;
11 use POSIX ();
12 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
13 my ($enc, $dec);
14 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
15 # and eliminate method call overhead
16 BEGIN {
17         eval {
18                 require Sereal::Encoder;
19                 require Sereal::Decoder;
20                 Sereal::Encoder->import('sereal_encode_with_object');
21                 Sereal::Decoder->import('sereal_decode_with_object');
22                 ($enc, $dec) = (Sereal::Encoder->new, Sereal::Decoder->new);
23         };
24 };
25
26 if ($enc && $dec) { # should be custom ops
27         *freeze = sub ($) { sereal_encode_with_object $enc, $_[0] };
28         *thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret };
29 } else {
30         eval { # some distros have Storable as a separate package from Perl
31                 require Storable;
32                 Storable->import(qw(freeze thaw));
33                 $enc = 1;
34         } // warn("Storable (part of Perl) missing: $@\n");
35 }
36
37 sub _get_rec ($) {
38         my ($r) = @_;
39         defined(my $len = <$r>) or return;
40         chop($len) eq "\n" or croak "no LF byte in $len";
41         defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
42         $n == $len or croak "short read: $n != $len";
43         thaw($buf);
44 }
45
46 sub _pack_rec ($) {
47         my ($ref) = @_;
48         my $buf = freeze($ref);
49         length($buf) . "\n" . $buf;
50 }
51
52 sub _send_rec ($$) {
53         my ($w, $ref) = @_;
54         print $w _pack_rec($ref) or croak "print: $!";
55 }
56
57 sub ipc_return ($$$) {
58         my ($w, $ret, $exc) = @_;
59         _send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
60 }
61
62 sub ipc_worker_loop ($$$) {
63         my ($self, $r_req, $w_res) = @_;
64         my ($rec, $wantarray, $sub, @args);
65         local $/ = "\n";
66         while ($rec = _get_rec($r_req)) {
67                 ($wantarray, $sub, @args) = @$rec;
68                 # no waiting if client doesn't care,
69                 # this is the overwhelmingly likely case
70                 if (!defined($wantarray)) {
71                         eval { $self->$sub(@args) };
72                         warn "$$ die: $@ (from nowait $sub)\n" if $@;
73                 } elsif ($wantarray) {
74                         my @ret = eval { $self->$sub(@args) };
75                         ipc_return($w_res, \@ret, $@);
76                 } else { # '' => wantscalar
77                         my $ret = eval { $self->$sub(@args) };
78                         ipc_return($w_res, \$ret, $@);
79                 }
80         }
81 }
82
83 # starts a worker if Sereal or Storable is installed
84 sub ipc_worker_spawn {
85         my ($self, $ident, $oldset) = @_;
86         return unless $enc; # no Sereal or Storable
87         return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
88         delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
89         pipe(my ($r_req, $w_req)) or die "pipe: $!";
90         pipe(my ($r_res, $w_res)) or die "pipe: $!";
91         my $sigset = $oldset // PublicInbox::DS::block_signals();
92         my $parent = $$;
93         $self->ipc_atfork_parent;
94         defined(my $pid = fork) or die "fork: $!";
95         if ($pid == 0) {
96                 eval { PublicInbox::DS->Reset };
97                 $self->{-ipc_parent_pid} = $parent;
98                 $w_req = $r_res = undef;
99                 $w_res->autoflush(1);
100                 $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
101                 local $0 = $ident;
102                 PublicInbox::DS::sig_setmask($oldset);
103                 my $on_destroy = $self->ipc_atfork_child;
104                 eval { ipc_worker_loop($self, $r_req, $w_res) };
105                 die "worker $ident PID:$$ died: $@\n" if $@;
106                 exit;
107         }
108         PublicInbox::DS::sig_setmask($sigset) unless $oldset;
109         $r_req = $w_res = undef;
110         $w_req->autoflush(1);
111         $self->{-ipc_req} = $w_req;
112         $self->{-ipc_res} = $r_res;
113         $self->{-ipc_ppid} = $$;
114         $self->{-ipc_pid} = $pid;
115 }
116
117 sub ipc_worker_reap { # dwaitpid callback
118         my ($self, $pid) = @_;
119         warn "PID:$pid died with \$?=$?\n" if $?;
120 }
121
122 # for base class, override in sub classes
123 sub ipc_atfork_parent {}
124 sub ipc_atfork_child {}
125
126 # should only be called inside the worker process
127 sub ipc_worker_exit {
128         my (undef, $code) = @_;
129         exit($code);
130 }
131
132 # idempotent, can be called regardless of whether worker is active or not
133 sub ipc_worker_stop {
134         my ($self) = @_;
135         my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
136         my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
137         if (!$w_req && !$r_res) {
138                 die "unexpected PID:$pid without IPC pipes" if $pid;
139                 return; # idempotent
140         }
141         die 'no PID with IPC pipes' unless $pid;
142         _send_rec($w_req, [ undef, 'ipc_worker_exit', 0 ]);
143         $w_req = $r_res = undef;
144
145         # allow any sibling to send ipc_worker_exit, but siblings can't wait
146         return if $$ != $ppid;
147         PublicInbox::DS::dwaitpid($pid, \&ipc_worker_reap, $self);
148 }
149
150 # use this if we have multiple readers reading curl or "pigz -dc"
151 # and writing to the same store
152 sub ipc_lock_init {
153         my ($self, $f) = @_;
154         require PublicInbox::Lock;
155         $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
156 }
157
158 sub ipc_async_wait ($$) {
159         my ($self, $max) = @_; # max == -1 to wait for all
160         my $aif = $self->{-async_inflight} or return;
161         my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res';
162         while (my ($sub, $bytes, $cb, $cb_arg) = splice(@$aif, 0, 4)) {
163                 my $ret = _get_rec($r_res) //
164                         die "no response on $sub (req.size=$bytes)";
165                 $self->{-async_inflight_bytes} -= $bytes;
166
167                 eval { $cb->($cb_arg, $ret) };
168                 warn "E: $sub callback error: $@\n" if $@;
169                 return if --$max == 0;
170         }
171 }
172
173 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
174 sub ipc_do {
175         my ($self, $sub, @args) = @_;
176         if (my $w_req = $self->{-ipc_req}) { # run in worker
177                 my $ipc_lock = $self->{-ipc_lock};
178                 my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
179                 if (defined(wantarray)) {
180                         my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res';
181                         ipc_async_wait($self, -1);
182                         _send_rec($w_req, [ wantarray, $sub, @args ]);
183                         my $ret = _get_rec($r_res) // die "no response on $sub";
184                         die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
185                         wantarray ? @$ret : $$ret;
186                 } else { # likely, fire-and-forget into pipe
187                         _send_rec($w_req, [ undef , $sub, @args ]);
188                 }
189         } else { # run locally
190                 $self->$sub(@args);
191         }
192 }
193
194 sub ipc_async {
195         my ($self, $sub, $sub_args, $cb, $cb_arg) = @_;
196         if (my $w_req = $self->{-ipc_req}) { # run in worker
197                 my $rec = _pack_rec([ 1, $sub, @$sub_args ]);
198                 my $cur_bytes = \($self->{-async_inflight_bytes} //= 0);
199                 while (($$cur_bytes + length($rec)) > PIPE_BUF) {
200                         ipc_async_wait($self, 1);
201                 }
202                 my $ipc_lock = $self->{-ipc_lock};
203                 my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
204                 print $w_req $rec or croak "print: $!";
205                 $$cur_bytes += length($rec);
206                 push @{$self->{-async_inflight}},
207                                 $sub, length($rec), $cb, $cb_arg;
208         } else {
209                 my $ret = [ eval { $self->$sub(@$sub_args) } ];
210                 if (my $exc = $@) {
211                         $ret = ( bless(\$exc, 'PublicInbox::IPC::Die') );
212                 }
213                 eval { $cb->($cb_arg, $ret) };
214                 warn "E: $sub callback error: $@\n" if $@;
215         }
216 }
217
218 # needed when there's multiple IPC workers and the parent forking
219 # causes newer siblings to inherit older siblings sockets
220 sub ipc_sibling_atfork_child {
221         my ($self) = @_;
222         my ($pid, undef) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
223         delete(@$self{qw(-ipc_req -ipc_res)});
224         $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
225 }
226
227 1;