]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/IPC.pm
lei: more consistent IPC exit and error handling
[public-inbox.git] / lib / PublicInbox / IPC.pm
index 728f726c4adb99a47891536ebe3d959c4d75f7fc..c8673e26832fd960601903aa532e8b5ac290cc46 100644 (file)
@@ -140,10 +140,9 @@ sub ipc_worker_reap { # dwaitpid callback
 }
 
 sub wq_wait_old {
-       my ($self, @args) = @_;
-       my $cb = ref($args[0]) eq 'CODE' ? shift(@args) : \&ipc_worker_reap;
+       my ($self, $cb, @args) = @_;
        my $pids = delete $self->{"-wq_old_pids.$$"} or return;
-       dwaitpid($_, $cb, [$self, @args]) for @$pids;
+       dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
 }
 
 # for base class, override in sub classes
@@ -348,13 +347,12 @@ sub wq_exit { # wakes up wq_worker_decr_wait
 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
        my ($self) = @_;
        return unless wq_workers($self);
-       my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
-       $self->wq_io_do('wq_exit', [ $s2, $s2, $s2 ]);
+       $self->wq_io_do('wq_exit');
        # caller must call wq_worker_decr_wait in main loop
 }
 
 sub wq_worker_decr_wait {
-       my ($self, $timeout) = @_;
+       my ($self, $timeout, $cb, @args) = @_;
        return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
        my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
        vec(my $rin = '', fileno($s1), 1) = 1;
@@ -363,17 +361,17 @@ sub wq_worker_decr_wait {
        recv($s1, my $pid, 64, 0) // croak "recv: $!";
        my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers';
        delete $workers->{$pid} // croak "BUG: PID:$pid invalid";
-       dwaitpid($pid, \&ipc_worker_reap, $self);
+       dwaitpid($pid, $cb // \&ipc_worker_reap, [ $self, @args ]);
 }
 
 # set or retrieve number of workers
 sub wq_workers {
-       my ($self, $nr) = @_;
+       my ($self, $nr, $cb, @args) = @_;
        my $cur = $self->{-wq_workers} or return;
        if (defined $nr) {
                while (scalar(keys(%$cur)) > $nr) {
                        $self->wq_worker_decr;
-                       $self->wq_worker_decr_wait;
+                       $self->wq_worker_decr_wait(undef, $cb, @args);
                }
                $self->wq_worker_incr while scalar(keys(%$cur)) < $nr;
        }
@@ -381,7 +379,7 @@ sub wq_workers {
 }
 
 sub wq_close {
-       my ($self, $nohang) = @_;
+       my ($self, $nohang, $cb, @args) = @_;
        delete @$self{qw(-wq_s1 -wq_s2)} or return;
        my $ppid = delete $self->{-wq_ppid} or return;
        my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
@@ -390,7 +388,9 @@ sub wq_close {
        if ($nohang) {
                push @{$self->{"-wq_old_pids.$$"}}, @pids;
        } else {
-               dwaitpid($_, \&ipc_worker_reap, $self) for @pids;
+               $cb //= \&ipc_worker_reap;
+               unshift @args, $self;
+               dwaitpid($_, $cb, \@args) for @pids;
        }
 }