# starts a worker if Sereal or Storable is installed
sub ipc_worker_spawn {
- my ($self, $ident, $oldset, $fields) = @_;
+ my ($self, $ident, $oldset, $fields, @cb_args) = @_;
return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
pipe(my ($r_req, $w_req)) or die "pipe: $!";
$self->{-ipc_req} = $w_req;
$self->{-ipc_res} = $r_res;
$self->{-ipc_ppid} = $$;
- awaitpid($pid, \&ipc_worker_reap, $self);
+ awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
$self->{-ipc_pid} = $pid;
}
sub ipc_worker_reap { # awaitpid callback
- my ($pid, $self) = @_;
+ my ($pid, $self, $cb, @args) = @_;
delete $self->{-wq_workers}->{$pid};
- if (my $cb_args = $self->{-reap_do}) {
- return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]);
- }
+ return $cb->($pid, $self, @args) if $cb;
return if !$?;
my $s = $? & 127;
# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
}
-# register wait workers
-sub awaitpid_init {
- my ($self, @cb_args) = @_;
- $self->{-reap_do} = \@cb_args;
-}
-
# for base class, override in sub classes
sub ipc_atfork_prepare {}
sub prepare_nonblock {
($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
- $_[0]->{-reap_do} or die 'BUG: {-reap_do} needed for nonblock';
require PublicInbox::WQBlocked;
}
}
}
-sub _wq_worker_start ($$$$) {
- my ($self, $oldset, $fields, $one) = @_;
+sub _wq_worker_start {
+ my ($self, $oldset, $fields, $one, @cb_args) = @_;
my ($bcast1, $bcast2);
$one or socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
die "socketpair: $!";
undef $end; # trigger exit
} else {
$self->{-wq_workers}->{$pid} = $bcast1;
- awaitpid($pid, \&ipc_worker_reap, $self);
+ awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
}
}
# starts workqueue workers if Sereal or Storable is installed
sub wq_workers_start {
- my ($self, $ident, $nr_workers, $oldset, $fields) = @_;
+ my ($self, $ident, $nr_workers, $oldset, $fields, @cb_args) = @_;
($send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
return if $self->{-wq_s1}; # idempotent
$self->{-wq_s1} = $self->{-wq_s2} = undef;
$self->{-wq_ident} = $ident;
my $one = $nr_workers == 1;
$self->{-wq_nr_workers} = $nr_workers;
- _wq_worker_start($self, $sigset, $fields, $one) for (1..$nr_workers);
+ for (1..$nr_workers) {
+ _wq_worker_start($self, $sigset, $fields, $one, @cb_args);
+ }
PublicInbox::DS::sig_setmask($sigset) unless $oldset;
$self->{-wq_ppid} = $$;
}
sub wq_close {
my ($self) = @_;
if (my $wqb = delete $self->{wqb}) {
- $self->{-reap_do} or die 'BUG: {-reap_do} unset';
$wqb->enq_close;
}
delete @$self{qw(-wq_s1 -wq_s2)} or return;
- return if $self->{-reap_do};
+ return if ($self->{-wq_ppid} // -1) != $$;
awaitpid($_) for keys %{$self->{-wq_workers}};
}
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
$flds->{lei} = $lei;
- $wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
- $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
+ $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds,
+ $wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
delete $lei->{pkt_op_p};
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
# preserve $? for ->fail or ->x_it code
}
-sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+sub wq_done_wait { # awaitpid cb (via wq_eof)
my ($pid, $wq, $lei) = @_;
local $current_lei = $lei;
my $err_type = $lei->{-err_type};
$_[0]->{lei}->{opt}->{'keep-going'});
}
-sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+sub _wq_done_wait { # awaitpid cb (via wq_eof)
my ($pid, $mrr, $lei) = @_;
if ($?) {
$lei->child_error($?);
$self->SUPER::recv_and_run(@args);
}
-sub _sto_atexit { # awaitpid cb (via awaitpid_init)
- my ($pid, $sto) = @_;
+sub _sto_atexit { # awaitpid cb
+ my ($pid) = @_;
warn "lei/store PID:$pid died \$?=$?\n" if $?;
}
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
- $self->awaitpid_init(\&_sto_atexit); # outlives $lei
$self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
lei => $lei,
-err_wr => $w,
to_close => [ $r ],
- });
+ }, \&_sto_atexit);
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
}
$dedupe->pause_dedupe if $dedupe;
}
-sub v2w_done_wait { # awaitpid cb (via awaitpid_init)
+sub v2w_done_wait { # awaitpid cb
my ($pid, $v2w, $lei) = @_;
$lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
}
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
my $v2w = $ibx->importer;
- $v2w->awaitpid_init(\&v2w_done_wait, $lei);
- $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+ $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei},
+ \&v2w_done_wait, $lei);
$lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
}
-sub _wq_done_wait { # awaitpid cb (via awaitpid_init)
+sub _wq_done_wait { # awaitpid cb
my ($pid, $wq, $lei) = @_;
$lei->child_error($?, 'auth failure') if $?
}
sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
-sub xsearch_done_wait { # awaitpid cb (via awaitpid_init)
+sub xsearch_done_wait { # awaitpid cb
my ($pid, $wq, $lei) = @_;
return if !$?;
my $s = $? & 127;
fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
$l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
}
- $l2m->awaitpid_init(\&xsearch_done_wait, $lei);
$l2m->wq_workers_start('lei2mail', undef,
- $lei->oldset, { lei => $lei });
+ $lei->oldset, { lei => $lei },
+ \&xsearch_done_wait, $lei);
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
delete $l2m->{au_peers};
}
- $self->awaitpid_init(\&xsearch_done_wait, $lei);
$self->wq_workers_start('lei_xsearch', undef,
- $lei->oldset, { lei => $lei });
+ $lei->oldset, { lei => $lei },
+ \&xsearch_done_wait, $lei);
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();