]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/IPC.pm
d909dc1c66357eaa46116e6111ea83cbcb329e0a
[public-inbox.git] / lib / PublicInbox / IPC.pm
1 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # base class for remote IPC calls and workqueues, requires Storable or Sereal
5 # - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes
6 # - wq_io_do and wq_worker* is for a single producer and multiple workers,
7 #   using SOCK_SEQPACKET for work distribution
8 # use ipc_do when you need work done on a certain process
9 # use wq_io_do when your work can be done on any idle worker
10 package PublicInbox::IPC;
11 use strict;
12 use v5.10.1;
13 use parent qw(Exporter);
14 use Carp qw(croak);
15 use PublicInbox::DS qw(dwaitpid);
16 use PublicInbox::Spawn;
17 use PublicInbox::OnDestroy;
18 use PublicInbox::WQWorker;
19 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
20 my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
21 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
22 our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
23 my $WQ_MAX_WORKERS = 4096;
24 my ($enc, $dec);
25 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
26 # and eliminate method call overhead
27 BEGIN {
28         eval {
29                 require Sereal::Encoder;
30                 require Sereal::Decoder;
31                 Sereal::Encoder->import('sereal_encode_with_object');
32                 Sereal::Decoder->import('sereal_decode_with_object');
33                 ($enc, $dec) = (Sereal::Encoder->new, Sereal::Decoder->new);
34         };
35 };
36
37 if ($enc && $dec) { # should be custom ops
38         *ipc_freeze = sub ($) { sereal_encode_with_object $enc, $_[0] };
39         *ipc_thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret };
40 } else {
41         require Storable;
42         *ipc_freeze = \&Storable::freeze;
43         *ipc_thaw = \&Storable::thaw;
44 }
45
46 my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
47 my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
48         require PublicInbox::CmdIPC4;
49         $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
50         PublicInbox::CmdIPC4->can('send_cmd4');
51 };
52
53 sub _get_rec ($) {
54         my ($r) = @_;
55         defined(my $len = <$r>) or return;
56         chop($len) eq "\n" or croak "no LF byte in $len";
57         defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
58         $n == $len or croak "short read: $n != $len";
59         ipc_thaw($buf);
60 }
61
62 sub _send_rec ($$) {
63         my ($w, $ref) = @_;
64         my $buf = ipc_freeze($ref);
65         print $w length($buf), "\n", $buf or croak "print: $!";
66 }
67
68 sub ipc_return ($$$) {
69         my ($w, $ret, $exc) = @_;
70         _send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
71 }
72
73 sub ipc_worker_loop ($$$) {
74         my ($self, $r_req, $w_res) = @_;
75         my ($rec, $wantarray, $sub, @args);
76         local $/ = "\n";
77         while ($rec = _get_rec($r_req)) {
78                 ($wantarray, $sub, @args) = @$rec;
79                 # no waiting if client doesn't care,
80                 # this is the overwhelmingly likely case
81                 if (!defined($wantarray)) {
82                         eval { $self->$sub(@args) };
83                         warn "$$ die: $@ (from nowait $sub)\n" if $@;
84                 } elsif ($wantarray) {
85                         my @ret = eval { $self->$sub(@args) };
86                         ipc_return($w_res, \@ret, $@);
87                 } else { # '' => wantscalar
88                         my $ret = eval { $self->$sub(@args) };
89                         ipc_return($w_res, \$ret, $@);
90                 }
91         }
92 }
93
94 # starts a worker if Sereal or Storable is installed
95 sub ipc_worker_spawn {
96         my ($self, $ident, $oldset, $fields) = @_;
97         return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
98         delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
99         pipe(my ($r_req, $w_req)) or die "pipe: $!";
100         pipe(my ($r_res, $w_res)) or die "pipe: $!";
101         my $sigset = $oldset // PublicInbox::DS::block_signals();
102         $self->ipc_atfork_prepare;
103         my $seed = rand(0xffffffff);
104         my $pid = fork // die "fork: $!";
105         if ($pid == 0) {
106                 srand($seed);
107                 eval { PublicInbox::DS->Reset };
108                 delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)};
109                 $w_req = $r_res = undef;
110                 $w_res->autoflush(1);
111                 $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
112                 local $0 = $ident;
113                 # ensure we properly exit even if warn() dies:
114                 my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
115                 eval {
116                         $fields //= {};
117                         local @$self{keys %$fields} = values(%$fields);
118                         my $on_destroy = $self->ipc_atfork_child;
119                         local %SIG = %SIG;
120                         PublicInbox::DS::sig_setmask($sigset);
121                         ipc_worker_loop($self, $r_req, $w_res);
122                 };
123                 warn "worker $ident PID:$$ died: $@\n" if $@;
124                 undef $end; # trigger exit
125         }
126         PublicInbox::DS::sig_setmask($sigset) unless $oldset;
127         $r_req = $w_res = undef;
128         $w_req->autoflush(1);
129         $self->{-ipc_req} = $w_req;
130         $self->{-ipc_res} = $r_res;
131         $self->{-ipc_ppid} = $$;
132         $self->{-ipc_pid} = $pid;
133 }
134
135 sub ipc_worker_reap { # dwaitpid callback
136         my ($args, $pid) = @_;
137         return if !$?;
138         # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
139         my $s = $? & 127;
140         warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
141 }
142
143 sub wq_wait_old {
144         my ($self, $cb, @args) = @_;
145         my $pids = delete $self->{"-wq_old_pids.$$"} or return;
146         dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
147 }
148
149 # for base class, override in sub classes
150 sub ipc_atfork_prepare {}
151
152 sub wq_atexit_child {}
153
154 sub ipc_atfork_child {
155         my ($self) = @_;
156         my $io = delete($self->{-ipc_atfork_child_close}) or return;
157         close($_) for @$io;
158         undef;
159 }
160
161 # idempotent, can be called regardless of whether worker is active or not
162 sub ipc_worker_stop {
163         my ($self, $args) = @_;
164         my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
165         my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
166         if (!$w_req && !$r_res) {
167                 die "unexpected PID:$pid without IPC pipes" if $pid;
168                 return; # idempotent
169         }
170         die 'no PID with IPC pipes' unless $pid;
171         $w_req = $r_res = undef;
172
173         return if $$ != $ppid;
174         dwaitpid($pid, \&ipc_worker_reap, [$self, $args]);
175 }
176
177 # use this if we have multiple readers reading curl or "pigz -dc"
178 # and writing to the same store
179 sub ipc_lock_init {
180         my ($self, $f) = @_;
181         $f // die 'BUG: no filename given';
182         require PublicInbox::Lock;
183         $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
184 }
185
186 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
187 sub ipc_do {
188         my ($self, $sub, @args) = @_;
189         if (my $w_req = $self->{-ipc_req}) { # run in worker
190                 my $ipc_lock = $self->{-ipc_lock};
191                 my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
192                 if (defined(wantarray)) {
193                         my $r_res = $self->{-ipc_res} or die 'no ipc_res';
194                         _send_rec($w_req, [ wantarray, $sub, @args ]);
195                         my $ret = _get_rec($r_res) // die "no response on $sub";
196                         die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
197                         wantarray ? @$ret : $$ret;
198                 } else { # likely, fire-and-forget into pipe
199                         _send_rec($w_req, [ undef , $sub, @args ]);
200                 }
201         } else { # run locally
202                 $self->$sub(@args);
203         }
204 }
205
206 # needed when there's multiple IPC workers and the parent forking
207 # causes newer siblings to inherit older siblings sockets
208 sub ipc_sibling_atfork_child {
209         my ($self) = @_;
210         my ($pid, undef) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
211         delete(@$self{qw(-ipc_req -ipc_res)});
212         $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
213 }
214
215 sub recv_and_run {
216         my ($self, $s2, $len, $full_stream) = @_;
217         my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
218         return if scalar(@fds) && !defined($fds[0]);
219         my $n = length($buf) or return 0;
220         my $nfd = 0;
221         for my $fd (@fds) {
222                 if (open(my $cmdfh, '+<&=', $fd)) {
223                         $self->{$nfd++} = $cmdfh;
224                         $cmdfh->autoflush(1);
225                 } else {
226                         die "$$ open(+<&=$fd) (FD:$nfd): $!";
227                 }
228         }
229         while ($full_stream && $n < $len) {
230                 my $r = sysread($s2, $buf, $len - $n, $n) // croak "read: $!";
231                 croak "read EOF after $n/$len bytes" if $r == 0;
232                 $n = length($buf);
233         }
234         # Sereal dies on truncated data, Storable returns undef
235         my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n";
236         undef $buf;
237         my $sub = shift @$args;
238         eval { $self->$sub(@$args) };
239         warn "$$ $0 wq_worker: $@" if $@;
240         delete @$self{0..($nfd-1)};
241         $n;
242 }
243
244 sub wq_worker_loop ($) {
245         my ($self, $bcast_a) = @_;
246         my $wqw = PublicInbox::WQWorker->new($self);
247         PublicInbox::WQWorker->new($self, '-wq_bcast2');
248         PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
249         PublicInbox::DS->EventLoop;
250         PublicInbox::DS->Reset;
251 }
252
253 sub do_sock_stream { # via wq_io_do, for big requests
254         my ($self, $len) = @_;
255         recv_and_run($self, my $s2 = delete $self->{0}, $len, 1);
256 }
257
258 sub wq_broadcast {
259         my ($self, $sub, @args) = @_;
260         if (my $wkr = $self->{-wq_workers}) {
261                 for my $bcast1 (values %$wkr) {
262                         my $buf = ipc_freeze([$sub, @args]);
263                         send($bcast1, $buf, MSG_EOR) // croak "send: $!";
264                         # XXX shouldn't have to deal with EMSGSIZE here...
265                 }
266         } else {
267                 eval { $self->$sub(@args) };
268                 warn "wq_broadcast: $@" if $@;
269         }
270 }
271
272 sub stream_in_full ($$$) {
273         my ($s1, $fds, $buf) = @_;
274         socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
275                 croak "socketpair: $!";
276         my $n = $send_cmd->($s1, [ fileno($r) ],
277                         ipc_freeze(['do_sock_stream', length($buf)]),
278                         MSG_EOR) // croak "sendmsg: $!";
279         undef $r;
280         $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
281         while ($n < length($buf)) {
282                 my $x = syswrite($w, $buf, length($buf) - $n, $n) //
283                                 croak "syswrite: $!";
284                 croak "syswrite wrote 0 bytes" if $x == 0;
285                 $n += $x;
286         }
287 }
288
289 sub wq_io_do { # always async
290         my ($self, $sub, $ios, @args) = @_;
291         if (my $s1 = $self->{-wq_s1}) { # run in worker
292                 my $fds = [ map { fileno($_) } @$ios ];
293                 my $buf = ipc_freeze([$sub, @args]);
294                 if (length($buf) > $MY_MAX_ARG_STRLEN) {
295                         stream_in_full($s1, $fds, $buf);
296                 } else {
297                         my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
298                         return if defined($n); # likely
299                         $!{ETOOMANYREFS} and
300                                 croak "sendmsg: $! (check RLIMIT_NOFILE)";
301                         $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
302                         croak("sendmsg: $!");
303                 }
304         } else {
305                 @$self{0..$#$ios} = @$ios;
306                 eval { $self->$sub(@args) };
307                 warn "wq_io_do: $@" if $@;
308                 delete @$self{0..$#$ios}; # don't close
309         }
310 }
311
312 sub _wq_worker_start ($$$) {
313         my ($self, $oldset, $fields) = @_;
314         my ($bcast1, $bcast2);
315         socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
316                                                 die "socketpair: $!";
317         my $seed = rand(0xffffffff);
318         my $pid = fork // die "fork: $!";
319         if ($pid == 0) {
320                 srand($seed);
321                 undef $bcast1;
322                 eval { PublicInbox::DS->Reset };
323                 delete @$self{qw(-wq_s1 -wq_ppid)};
324                 $self->{-wq_worker_nr} =
325                                 keys %{delete($self->{-wq_workers}) // {}};
326                 $SIG{$_} = 'IGNORE' for (qw(PIPE));
327                 $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
328                 local $0 = "$self->{-wq_ident} $self->{-wq_worker_nr}";
329                 # ensure we properly exit even if warn() dies:
330                 my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
331                 eval {
332                         $fields //= {};
333                         local @$self{keys %$fields} = values(%$fields);
334                         my $on_destroy = $self->ipc_atfork_child;
335                         local %SIG = %SIG;
336                         PublicInbox::DS::sig_setmask($oldset);
337                         $self->{-wq_bcast2} = $bcast2;
338                         wq_worker_loop($self);
339                 };
340                 warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
341                 undef $end; # trigger exit
342         } else {
343                 $self->{-wq_workers}->{$pid} = $bcast1;
344         }
345 }
346
347 # starts workqueue workers if Sereal or Storable is installed
348 sub wq_workers_start {
349         my ($self, $ident, $nr_workers, $oldset, $fields) = @_;
350         ($send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
351         return if $self->{-wq_s1}; # idempotent
352         $self->{-wq_s1} = $self->{-wq_s2} = undef;
353         socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
354                 die "socketpair: $!";
355         $self->ipc_atfork_prepare;
356         $nr_workers //= $self->{-wq_nr_workers};
357         $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
358         my $sigset = $oldset // PublicInbox::DS::block_signals();
359         $self->{-wq_workers} = {};
360         $self->{-wq_ident} = $ident;
361         _wq_worker_start($self, $sigset, $fields) for (1..$nr_workers);
362         PublicInbox::DS::sig_setmask($sigset) unless $oldset;
363         $self->{-wq_ppid} = $$;
364 }
365
366 sub wq_worker_incr { # SIGTTIN handler
367         my ($self, $oldset, $fields) = @_;
368         $self->{-wq_s2} or return;
369         die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
370         return if wq_workers($self) >= $WQ_MAX_WORKERS;
371         $self->ipc_atfork_prepare;
372         my $sigset = $oldset // PublicInbox::DS::block_signals();
373         _wq_worker_start($self, $sigset, $fields);
374         PublicInbox::DS::sig_setmask($sigset) unless $oldset;
375 }
376
377 sub wq_exit { # wakes up wq_worker_decr_wait
378         send($_[0]->{-wq_s2}, $$, MSG_EOR) // die "$$ send: $!";
379         exit;
380 }
381
382 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
383         my ($self) = @_;
384         return unless wq_workers($self);
385         die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
386         $self->wq_io_do('wq_exit');
387         # caller must call wq_worker_decr_wait in main loop
388 }
389
390 sub wq_worker_decr_wait {
391         my ($self, $timeout, $cb, @args) = @_;
392         return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
393         die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
394         my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
395         vec(my $rin = '', fileno($s1), 1) = 1;
396         select(my $rout = $rin, undef, undef, $timeout) or
397                 croak 'timed out waiting for wq_exit';
398         recv($s1, my $pid, 64, 0) // croak "recv: $!";
399         my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers';
400         delete $workers->{$pid} // croak "BUG: PID:$pid invalid";
401         dwaitpid($pid, $cb // \&ipc_worker_reap, [ $self, @args ]);
402 }
403
404 # set or retrieve number of workers
405 sub wq_workers {
406         my ($self, $nr, $cb, @args) = @_;
407         my $cur = $self->{-wq_workers} or return;
408         if (defined $nr) {
409                 while (scalar(keys(%$cur)) > $nr) {
410                         $self->wq_worker_decr;
411                         $self->wq_worker_decr_wait(undef, $cb, @args);
412                 }
413                 $self->wq_worker_incr while scalar(keys(%$cur)) < $nr;
414         }
415         scalar(keys(%$cur));
416 }
417
418 sub wq_close {
419         my ($self, $nohang, $cb, @args) = @_;
420         delete @$self{qw(-wq_s1 -wq_s2)} or return;
421         my $ppid = delete $self->{-wq_ppid} or return;
422         my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
423         return if $ppid != $$; # can't reap siblings or parents
424         my @pids = map { $_ + 0 } keys %$workers;
425         if ($nohang) {
426                 push @{$self->{"-wq_old_pids.$$"}}, @pids;
427         } else {
428                 $cb //= \&ipc_worker_reap;
429                 unshift @args, $self;
430                 dwaitpid($_, $cb, \@args) for @pids;
431         }
432 }
433
434 sub wq_kill_old {
435         my ($self, $sig) = @_;
436         my $pids = $self->{"-wq_old_pids.$$"} or return;
437         kill($sig // 'TERM', @$pids);
438 }
439
440 sub wq_kill {
441         my ($self, $sig) = @_;
442         my $workers = $self->{-wq_workers} or return;
443         kill($sig // 'TERM', keys %$workers);
444 }
445
446 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
447
448 sub DESTROY {
449         my ($self) = @_;
450         my $ppid = $self->{-wq_ppid};
451         wq_kill($self) if $ppid && $ppid == $$;
452         my $err = $?;
453         wq_close($self);
454         wq_wait_old($self);
455         ipc_worker_stop($self);
456         $? = $err if $err;
457 }
458
459 sub detect_nproc () {
460         # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl
461         return POSIX::sysconf(84) if $^O eq 'linux';
462         return POSIX::sysconf(58) if $^O eq 'freebsd';
463         # TODO: more OSes
464
465         # getconf(1) is POSIX, but *NPROCESSORS* vars are not
466         for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) {
467                 `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1;
468         }
469         for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
470                 `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
471         }
472
473         # should we bother with `sysctl hw.ncpu`?  Those only give
474         # us total processor count, not online processor count.
475         undef
476 }
477
478 1;