-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# base class for remote IPC calls and workqueues, requires Storable or Sereal
use v5.10.1;
use parent qw(Exporter);
use Carp qw(croak);
-use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::DS qw(awaitpid);
use PublicInbox::Spawn;
use PublicInbox::OnDestroy;
use PublicInbox::WQWorker;
}
my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
-my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
+our $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
require PublicInbox::CmdIPC4;
$recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
PublicInbox::CmdIPC4->can('send_cmd4');
+} // do {
+ require PublicInbox::Syscall;
+ $recv_cmd //= PublicInbox::Syscall->can('recv_cmd4');
+ PublicInbox::Syscall->can('send_cmd4');
};
sub _get_rec ($) {
my $pid = fork // die "fork: $!";
if ($pid == 0) {
srand($seed);
+ eval { Net::SSLeay::randomize() };
eval { PublicInbox::DS->Reset };
delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)};
$w_req = $r_res = undef;
$self->{-ipc_req} = $w_req;
$self->{-ipc_res} = $r_res;
$self->{-ipc_ppid} = $$;
+ awaitpid($pid, \&ipc_worker_reap, $self);
$self->{-ipc_pid} = $pid;
}
-sub ipc_worker_reap { # dwaitpid callback
- my ($args, $pid) = @_;
+sub ipc_worker_reap { # awaitpid callback
+ my ($pid, $self) = @_;
+ delete $self->{-wq_workers}->{$pid};
+ if (my $cb_args = $self->{-reap_do}) {
+ return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]);
+ }
return if !$?;
- # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
my $s = $? & 127;
- warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
+ # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+ warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
}
-sub wq_wait_old {
- my ($self, $cb, @args) = @_;
- my $pids = delete $self->{"-wq_old_pids.$$"} or return;
- dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
+# register wait workers
+sub awaitpid_init {
+ my ($self, @cb_args) = @_;
+ $self->{-reap_do} = \@cb_args;
}
# for base class, override in sub classes
# idempotent, can be called regardless of whether worker is active or not
sub ipc_worker_stop {
- my ($self, $args) = @_;
+ my ($self) = @_;
my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
if (!$w_req && !$r_res) {
}
die 'no PID with IPC pipes' unless $pid;
$w_req = $r_res = undef;
-
- return if $$ != $ppid;
- dwaitpid($pid, \&ipc_worker_reap, [$self, $args]);
+ awaitpid($pid) if $$ == $ppid; # for non-event loop
}
# use this if we have multiple readers reading curl or "pigz -dc"
undef $buf;
my $sub = shift @$args;
eval { $self->$sub(@$args) };
- warn "$$ $0 wq_worker: $@" if $@;
+ warn "$$ $0 wq_worker: $sub: $@" if $@;
delete @$self{0..($nfd-1)};
$n;
}
my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
- PublicInbox::DS->EventLoop;
+ PublicInbox::DS::event_loop();
PublicInbox::DS->Reset;
}
}
}
+sub prepare_nonblock {
+ ($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
+ $_[0]->{-reap_do} or die 'BUG: {-reap_do} needed for nonblock';
+ require PublicInbox::WQBlocked;
+}
+
+sub wq_nonblock_do { # always async
+ my ($self, $sub, @args) = @_;
+ my $buf = ipc_freeze([$sub, @args]);
+ if ($self->{wqb}) { # saturated once, assume saturated forever
+ $self->{wqb}->flush_send($buf);
+ } else {
+ $send_cmd->($self->{-wq_s1}, [], $buf, MSG_EOR) //
+ ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf)
+ : croak("sendmsg: $!"));
+ }
+}
+
sub _wq_worker_start ($$$$) {
my ($self, $oldset, $fields, $one) = @_;
my ($bcast1, $bcast2);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
srand($seed);
+ eval { Net::SSLeay::randomize() };
undef $bcast1;
eval { PublicInbox::DS->Reset };
delete @$self{qw(-wq_s1 -wq_ppid)};
$self->{-wq_worker_nr} =
keys %{delete($self->{-wq_workers}) // {}};
- $SIG{$_} = 'IGNORE' for (qw(PIPE));
$SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
local $0 = $one ? $self->{-wq_ident} :
"$self->{-wq_ident} $self->{-wq_worker_nr}";
undef $end; # trigger exit
} else {
$self->{-wq_workers}->{$pid} = $bcast1;
+ awaitpid($pid, \&ipc_worker_reap, $self);
}
}
}
sub wq_close {
- my ($self, $nohang, $cb, @args) = @_;
- delete @$self{qw(-wq_s1 -wq_s2)} or return;
- my $ppid = delete $self->{-wq_ppid} or return;
- my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
- return if $ppid != $$; # can't reap siblings or parents
- my @pids = map { $_ + 0 } keys %$workers;
- if ($nohang) {
- push @{$self->{"-wq_old_pids.$$"}}, @pids;
- } else {
- $cb //= \&ipc_worker_reap;
- unshift @args, $self;
- dwaitpid($_, $cb, \@args) for @pids;
+ my ($self) = @_;
+ if (my $wqb = delete $self->{wqb}) {
+ $self->{-reap_do} or die 'BUG: {-reap_do} unset';
+ $wqb->enq_close;
}
-}
-
-sub wq_kill_old {
- my ($self, $sig) = @_;
- my $pids = $self->{"-wq_old_pids.$$"} or return;
- kill($sig // 'TERM', @$pids);
+ delete @$self{qw(-wq_s1 -wq_s2)} or return;
+ return if $self->{-reap_do};
+ awaitpid($_) for keys %{$self->{-wq_workers}};
}
sub wq_kill {
my ($self, $sig) = @_;
- my $workers = $self->{-wq_workers} or return;
- kill($sig // 'TERM', keys %$workers);
+ kill($sig // 'TERM', keys %{$self->{-wq_workers}});
}
sub DESTROY {
my ($self) = @_;
my $ppid = $self->{-wq_ppid};
wq_kill($self) if $ppid && $ppid == $$;
- my $err = $?;
wq_close($self);
- wq_wait_old($self);
ipc_worker_stop($self);
- $? = $err if $err;
}
sub detect_nproc () {