summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
f7f0e7b)
This avoids awkwardly stuffing an arrayref into callbacks
which expect multiple arguments. IPC->awaitpid_init now
allows pre-registering callbacks before spawning workers.
use v5.10.1;
use parent qw(Exporter);
use Carp qw(croak);
use v5.10.1;
use parent qw(Exporter);
use Carp qw(croak);
-use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::DS qw(awaitpid);
use PublicInbox::Spawn;
use PublicInbox::OnDestroy;
use PublicInbox::WQWorker;
use PublicInbox::Spawn;
use PublicInbox::OnDestroy;
use PublicInbox::WQWorker;
$self->{-ipc_req} = $w_req;
$self->{-ipc_res} = $r_res;
$self->{-ipc_ppid} = $$;
$self->{-ipc_req} = $w_req;
$self->{-ipc_res} = $r_res;
$self->{-ipc_ppid} = $$;
+ awaitpid($pid, \&ipc_worker_reap, $self);
$self->{-ipc_pid} = $pid;
}
$self->{-ipc_pid} = $pid;
}
-sub ipc_worker_reap { # dwaitpid callback
- my ($args, $pid) = @_;
- my ($self, @uargs) = @$args;
+sub ipc_worker_reap { # awaitpid callback
+ my ($pid, $self) = @_;
delete $self->{-wq_workers}->{$pid};
delete $self->{-wq_workers}->{$pid};
- return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
+ if (my $cb_args = $self->{-reap_do}) {
+ return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]);
+ }
return if !$?;
my $s = $? & 127;
# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
}
return if !$?;
my $s = $? & 127;
# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
}
-sub wq_wait_async {
- my ($self, $cb, @uargs) = @_;
- local $PublicInbox::DS::in_loop = 1;
- $self->{-reap_do} = $cb;
- my @pids = keys %{$self->{-wq_workers}};
- dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
+# register wait workers
+sub awaitpid_init {
+ my ($self, @cb_args) = @_;
+ $self->{-reap_do} = \@cb_args;
}
# for base class, override in sub classes
}
# for base class, override in sub classes
}
die 'no PID with IPC pipes' unless $pid;
$w_req = $r_res = undef;
}
die 'no PID with IPC pipes' unless $pid;
$w_req = $r_res = undef;
-
- return if $$ != $ppid;
- dwaitpid($pid, \&ipc_worker_reap, [$self]);
+ awaitpid($pid) if $$ == $ppid; # for non-event loop
}
# use this if we have multiple readers reading curl or "pigz -dc"
}
# use this if we have multiple readers reading curl or "pigz -dc"
undef $end; # trigger exit
} else {
$self->{-wq_workers}->{$pid} = $bcast1;
undef $end; # trigger exit
} else {
$self->{-wq_workers}->{$pid} = $bcast1;
+ awaitpid($pid, \&ipc_worker_reap, $self);
}
delete @$self{qw(-wq_s1 -wq_s2)} or return;
return if $self->{-reap_do};
}
delete @$self{qw(-wq_s1 -wq_s2)} or return;
return if $self->{-reap_do};
- my @pids = keys %{$self->{-wq_workers}};
- dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
+ awaitpid($_) for keys %{$self->{-wq_workers}};
use Fcntl qw(SEEK_SET);
use PublicInbox::Config;
use PublicInbox::Syscall qw(EPOLLIN);
use Fcntl qw(SEEK_SET);
use PublicInbox::Config;
use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::DS qw(dwaitpid);
use PublicInbox::Spawn qw(spawn popen_rd);
use PublicInbox::Lock;
use PublicInbox::Eml;
use PublicInbox::Spawn qw(spawn popen_rd);
use PublicInbox::Lock;
use PublicInbox::Eml;
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
$flds->{lei} = $lei;
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
$flds->{lei} = $lei;
+ $wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
delete $lei->{pkt_op_p};
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
$lei->event_step_init;
$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
delete $lei->{pkt_op_p};
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
$lei->event_step_init;
- $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
# preserve $? for ->fail or ->x_it code
}
# preserve $? for ->fail or ->x_it code
}
-sub wq_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+ my ($pid, $wq, $lei) = @_;
local $current_lei = $lei;
my $err_type = $lei->{-err_type};
$? and $lei->child_error($?,
local $current_lei = $lei;
my $err_type = $lei->{-err_type};
$? and $lei->child_error($?,
sub process_inputs { # via wq_do
my ($self) = @_;
sub process_inputs { # via wq_do
my ($self) = @_;
- local $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
+ local $PublicInbox::DS::in_loop = 0; # force synchronous awaitpid
$self->SUPER::process_inputs;
my $lei = $self->{lei};
delete $lei->{1};
$self->SUPER::process_inputs;
my $lei = $self->{lei};
delete $lei->{1};
$mbl->{fh} =
PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei);
}
$mbl->{fh} =
PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei);
}
- local $PublicInbox::DS::in_loop = 0 if $zsfx; # dwaitpid
+ local $PublicInbox::DS::in_loop = 0 if $zsfx; # awaitpid
$self->input_fh($ifmt, $mbl->{fh}, $input, @args);
} elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
$self->input_fh($ifmt, $mbl->{fh}, $input, @args);
} elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
$_[0]->{lei}->{opt}->{'keep-going'});
}
$_[0]->{lei}->{opt}->{'keep-going'});
}
-sub _wq_done_wait { # dwaitpid callback (via wq_eof)
- my ($arg, $pid) = @_;
- my ($mrr, $lei) = @$arg;
+sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+ my ($pid, $mrr, $lei) = @_;
if ($?) {
$lei->child_error($?);
} elsif (!$lei->{child_error}) {
if ($?) {
$lei->child_error($?);
} elsif (!$lei->{child_error}) {
my ($k) = ($sw =~ /\A([\w-]+)/);
$opt->{$k} = $lei->{opt}->{$k};
}
my ($k) = ($sw =~ /\A([\w-]+)/);
$opt->{$k} = $lei->{opt}->{$k};
}
- # force synchronous dwaitpid for v2:
+ # force synchronous awaitpid for v2:
local $PublicInbox::DS::in_loop = 0;
my $cfg = PublicInbox::Config->new(undef, $lei->{2});
my $env = PublicInbox::Admin::index_prepare($opt, $cfg);
local $PublicInbox::DS::in_loop = 0;
my $cfg = PublicInbox::Config->new(undef, $lei->{2});
my $env = PublicInbox::Admin::index_prepare($opt, $cfg);
$self->SUPER::recv_and_run(@args);
}
$self->SUPER::recv_and_run(@args);
}
-sub _sto_atexit { # dwaitpid callback
- my ($args, $pid) = @_;
- my $self = $args->[0];
+sub _sto_atexit { # awaitpid cb (via awaitpid_init)
+ my ($pid, $sto) = @_;
warn "lei/store PID:$pid died \$?=$?\n" if $?;
}
warn "lei/store PID:$pid died \$?=$?\n" if $?;
}
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
+ $self->awaitpid_init(\&_sto_atexit); # outlives $lei
$self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
lei => $lei,
-err_wr => $w,
to_close => [ $r ],
});
$self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
lei => $lei,
-err_wr => $w,
to_close => [ $r ],
});
- $self->wq_wait_async(\&_sto_atexit); # outlives $lei
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
}
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
}
$dedupe->pause_dedupe if $dedupe;
}
$dedupe->pause_dedupe if $dedupe;
}
-sub v2w_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($v2w, $lei) = @$arg;
+sub v2w_done_wait { # awaitpid cb (via awaitpid_init)
+ my ($pid, $v2w, $lei) = @_;
$lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
}
$lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
}
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
my $v2w = $ibx->importer;
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
my $v2w = $ibx->importer;
+ $v2w->awaitpid_init(\&v2w_done_wait, $lei);
$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
- $v2w->wq_wait_async(\&v2w_done_wait, $lei);
$lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
$lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
}
map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
}
-sub _wq_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($wq, $lei) = @$arg;
+sub _wq_done_wait { # awaitpid cb (via awaitpid_init)
+ my ($pid, $wq, $lei) = @_;
$lei->child_error($?, 'auth failure') if $?
}
$lei->child_error($?, 'auth failure') if $?
}
sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
-sub xsearch_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($wq, $lei) = @$arg;
+sub xsearch_done_wait { # awaitpid cb (via awaitpid_init)
+ my ($pid, $wq, $lei) = @_;
return if !$?;
my $s = $? & 127;
return $lei->child_error($?) if $s == 13 || $s == 15;
return if !$?;
my $s = $? & 127;
return $lei->child_error($?) if $s == 13 || $s == 15;
fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
$l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
}
fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
$l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
}
+ $l2m->awaitpid_init(\&xsearch_done_wait, $lei);
$l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
$l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
- $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
delete $l2m->{au_peers};
}
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
delete $l2m->{au_peers};
}
+ $self->awaitpid_init(\&xsearch_done_wait, $lei);
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
- $self->wq_wait_async(\&xsearch_done_wait, $lei);
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();
-$mrr->can('_wq_done_wait')->([$mrr, $lei], $$);
+$mrr->can('_wq_done_wait')->($$, $mrr, $lei);
exit(($lei->{child_error} // 0) >> 8);
exit(($lei->{child_error} // 0) >> 8);