]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ipc: wq_do: support synchronous waits and responses
authorEric Wong <e@80x24.org>
Sun, 19 Sep 2021 12:50:20 +0000 (12:50 +0000)
committerEric Wong <e@80x24.org>
Sun, 19 Sep 2021 19:52:43 +0000 (19:52 +0000)
This brings the wq_* SOCK_SEQPACKET API functionality
on par with the ipc_do (pipe-based) API.

lib/PublicInbox/IPC.pm
t/ipc.t

index 9efe551bd2398574f4d82bb4d44e81466f2bd06f..d5e37719bf48f3cd36cb84eedc5a256a8066f678 100644 (file)
@@ -182,6 +182,13 @@ sub ipc_lock_init {
        $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
 }
 
+sub _wait_return ($$) {
+       my ($r_res, $sub) = @_;
+       my $ret = _get_rec($r_res) // die "no response on $sub";
+       die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+       wantarray ? @$ret : $$ret;
+}
+
 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
 sub ipc_do {
        my ($self, $sub, @args) = @_;
@@ -191,9 +198,7 @@ sub ipc_do {
                if (defined(wantarray)) {
                        my $r_res = $self->{-ipc_res} or die 'no ipc_res';
                        _send_rec($w_req, [ wantarray, $sub, @args ]);
-                       my $ret = _get_rec($r_res) // die "no response on $sub";
-                       die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
-                       wantarray ? @$ret : $$ret;
+                       _wait_return($r_res, $sub);
                } else { # likely, fire-and-forget into pipe
                        _send_rec($w_req, [ undef , $sub, @args ]);
                }
@@ -298,7 +303,7 @@ sub wq_io_do { # always async
                        $!{ETOOMANYREFS} and
                                croak "sendmsg: $! (check RLIMIT_NOFILE)";
                        $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
-                       croak("sendmsg: $!");
+                               croak("sendmsg: $!");
                }
        } else {
                @$self{0..$#$ios} = @$ios;
@@ -308,6 +313,29 @@ sub wq_io_do { # always async
        }
 }
 
+sub wq_sync_run {
+       my ($self, $wantarray, $sub, @args) = @_;
+       if ($wantarray) {
+               my @ret = eval { $self->$sub(@args) };
+               ipc_return($self->{0}, \@ret, $@);
+       } else { # '' => wantscalar
+               my $ret = eval { $self->$sub(@args) };
+               ipc_return($self->{0}, \$ret, $@);
+       }
+}
+
+sub wq_do {
+       my ($self, $sub, @args) = @_;
+       if (defined(wantarray)) {
+               pipe(my ($r, $w)) or die "pipe: $!";
+               wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args);
+               undef $w;
+               _wait_return($r, $sub);
+       } else {
+               wq_io_do($self, $sub, [], @args);
+       }
+}
+
 sub _wq_worker_start ($$$) {
        my ($self, $oldset, $fields) = @_;
        my ($bcast1, $bcast2);
diff --git a/t/ipc.t b/t/ipc.t
index 7983fdc02ef1bcffcd479e860b2c92dd0c7c8dba..202b1cc658df533b1a52b44676a19389808081a5 100644 (file)
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -161,6 +161,12 @@ SKIP: {
                is(waitpid($pid, 0), $pid, 'waitpid complete');
                is($?, 0, 'child wq producer exited');
        }
+       my @ary = $ipc->wq_do('test_array');
+       is_deeply(\@ary, [ qw(test array) ], 'wq_do wantarray');
+       is(my $s = $ipc->wq_do('test_scalar'), 'scalar', 'defined wantarray');
+       my $exp = bless ['blessed'], 'PublicInbox::WTF';
+       my $ret = eval { $ipc->wq_do('test_die', $exp) };
+       is_deeply($@, $exp, 'die with blessed ref');
 }
 
 $ipc->wq_close;