my $pid = fork // die "fork: $!";
if ($pid == 0) {
srand($seed);
+ eval { Net::SSLeay::randomize() };
eval { PublicInbox::DS->Reset };
delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)};
$w_req = $r_res = undef;
$fields //= {};
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
- local %SIG = %SIG;
+ local @SIG{keys %SIG} = values %SIG;
PublicInbox::DS::sig_setmask($sigset);
ipc_worker_loop($self, $r_req, $w_res);
};
sub ipc_worker_reap { # dwaitpid callback
my ($args, $pid) = @_;
+ my ($self, @uargs) = @$args;
+ delete $self->{-wq_workers}->{$pid};
+ return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
return if !$?;
- # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
my $s = $? & 127;
- warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
+ # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+ warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
}
-sub wq_wait_old {
- my ($self, $cb, @args) = @_;
- my $pids = delete $self->{"-wq_old_pids.$$"} or return;
- dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
+sub wq_wait_async {
+ my ($self, $cb, @uargs) = @_;
+ local $PublicInbox::DS::in_loop = 1;
+ $self->{-reap_async} = 1;
+ $self->{-reap_do} = $cb;
+ my @pids = keys %{$self->{-wq_workers}};
+ dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
}
# for base class, override in sub classes
undef $buf;
my $sub = shift @$args;
eval { $self->$sub(@$args) };
- warn "$$ $0 wq_worker: $@" if $@;
+ warn "$$ $0 wq_worker: $sub: $@" if $@;
delete @$self{0..($nfd-1)};
$n;
}
my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
- PublicInbox::DS->EventLoop;
+ PublicInbox::DS::event_loop();
PublicInbox::DS->Reset;
}
my $pid = fork // die "fork: $!";
if ($pid == 0) {
srand($seed);
+ eval { Net::SSLeay::randomize() };
undef $bcast1;
eval { PublicInbox::DS->Reset };
delete @$self{qw(-wq_s1 -wq_ppid)};
$self->{-wq_worker_nr} =
keys %{delete($self->{-wq_workers}) // {}};
- $SIG{$_} = 'IGNORE' for (qw(PIPE));
$SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
- local $0 = "$self->{-wq_ident} $self->{-wq_worker_nr}";
+ local $0 = $one ? $self->{-wq_ident} :
+ "$self->{-wq_ident} $self->{-wq_worker_nr}";
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
$fields //= {};
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
- local %SIG = %SIG;
+ local @SIG{keys %SIG} = values %SIG;
PublicInbox::DS::sig_setmask($oldset);
wq_worker_loop($self, $bcast2);
};
}
sub wq_close {
- my ($self, $nohang, $cb, @args) = @_;
+ my ($self) = @_;
delete @$self{qw(-wq_s1 -wq_s2)} or return;
- my $ppid = delete $self->{-wq_ppid} or return;
- my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
- return if $ppid != $$; # can't reap siblings or parents
- my @pids = map { $_ + 0 } keys %$workers;
- if ($nohang) {
- push @{$self->{"-wq_old_pids.$$"}}, @pids;
- } else {
- $cb //= \&ipc_worker_reap;
- unshift @args, $self;
- dwaitpid($_, $cb, \@args) for @pids;
- }
-}
-
-sub wq_kill_old {
- my ($self, $sig) = @_;
- my $pids = $self->{"-wq_old_pids.$$"} or return;
- kill($sig // 'TERM', @$pids);
+ return if $self->{-reap_async};
+ my @pids = keys %{$self->{-wq_workers}};
+ dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
}
sub wq_kill {
my ($self, $sig) = @_;
- my $workers = $self->{-wq_workers} or return;
- kill($sig // 'TERM', keys %$workers);
+ kill($sig // 'TERM', keys %{$self->{-wq_workers}});
}
sub DESTROY {
my ($self) = @_;
my $ppid = $self->{-wq_ppid};
wq_kill($self) if $ppid && $ppid == $$;
- my $err = $?;
wq_close($self);
- wq_wait_old($self);
ipc_worker_stop($self);
- $? = $err if $err;
}
sub detect_nproc () {