X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=t%2Fipc.t;h=ce89f94bddc0d3daaa541f3b54aaae8545dd2cb0;hb=4eee5af6011cc8cdefb66c9729952c7eff5c0b0b;hp=f09f76ef740434b8b02c35a5640c890a91e5a127;hpb=862d18680dccc30ef6cc8044da925ec5085911b2;p=public-inbox.git diff --git a/t/ipc.t b/t/ipc.t index f09f76ef..ce89f94b 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -6,10 +6,14 @@ use v5.10.1; use Test::More; use PublicInbox::TestCommon; use Fcntl qw(SEEK_SET); +use Digest::SHA qw(sha1_hex); +require_mods(qw(Storable||Sereal)); require_ok 'PublicInbox::IPC'; +my ($tmpdir, $for_destroy) = tmpdir(); state $once = eval <<''; package PublicInbox::IPC; use strict; +use Digest::SHA qw(sha1_hex); sub test_array { qw(test array) } sub test_scalar { 'scalar' } sub test_scalarref { \'scalarref' } @@ -23,13 +27,23 @@ sub test_write_each_fd { $self->{$fd}->flush; } } +sub test_sha { + my ($self, $buf) = @_; + print { $self->{1} } sha1_hex($buf), "\n"; + $self->{1}->flush; +} +sub test_append_pid { + my ($self, $file) = @_; + open my $fh, '>>', $file or die "open: $!"; + $fh->autoflush(1); + print $fh "$$\n" or die "print: $!"; +} 1; my $ipc = bless {}, 'PublicInbox::IPC'; my @t = qw(array scalar scalarref undef); my $test = sub { my $x = shift; - my @res; for my $type (@t) { my $m = "test_$type"; my @ret = $ipc->ipc_do($m); @@ -38,34 +52,10 @@ my $test = sub { $ipc->ipc_do($m); - $ipc->ipc_async($m, [], sub { push @res, \@_ }, \$m); - my $ret = $ipc->ipc_do($m); my $exp = $ipc->$m; is_deeply($ret, $exp, "!wantarray $m $x"); - - is_deeply(\@res, [ [ \$m, \@exp ] ], "async $m $x"); - @res = (); } - $ipc->ipc_async_wait(-1); - is_deeply(\@res, [], 'no leftover results'); - $ipc->ipc_async('test_die', ['die test'], - sub { push @res, \@_ }, 'die arg'); - $ipc->ipc_async_wait(1); - is(scalar(@res), 1, 'only one result'); - is(scalar(@{$res[0]}), 2, 'result has 2-element array'); - is($res[0]->[0], 'die arg', 'got async die arg '.$x); - is(ref($res[0]->[1]), 'PublicInbox::IPC::Die', - "exception type $x"); - { - my $nr = PublicInbox::IPC::PIPE_BUF(); - my $count = 0; - my $cb = sub { ++$count }; - $ipc->ipc_async('test_undef', [], $cb) for (1..$nr); - $ipc->ipc_async_wait(-1); - is($count, $nr, "$x async runs w/o deadlock"); - } - my $ret = eval { $ipc->test_die('phail') }; my $exp = $@; $ret = eval { $ipc->ipc_do('test_die', 'phail') }; @@ -94,25 +84,20 @@ my $test = sub { }; $test->('local'); -SKIP: { - require_mods(qw(Storable||Sereal), 16); +{ my $pid = $ipc->ipc_worker_spawn('test worker'); ok($pid > 0 && kill(0, $pid), 'worker spawned and running'); defined($pid) or BAIL_OUT 'no spawn, no test'; is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); $test->('worker'); - { - my ($tmp, $for_destroy) = tmpdir(); - $ipc->ipc_lock_init("$tmp/lock"); - is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); - } + $ipc->ipc_lock_init("$tmpdir/lock"); + is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); $ipc->ipc_worker_stop; ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped'); } $ipc->ipc_worker_stop; # idempotent # work queues -$ipc->{wq_open_modes} = [qw( >&= >&= >&= )]; pipe(my ($ra, $wa)) or BAIL_OUT $!; pipe(my ($rb, $wb)) or BAIL_OUT $!; pipe(my ($rc, $wc)) or BAIL_OUT $!; @@ -120,8 +105,12 @@ open my $warn, '+>', undef or BAIL_OUT; $warn->autoflush(0); local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ }; my @ppids; +open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!"; +my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!"; +close $agpl or BAIL_OUT "close: $!"; + for my $t ('local', 'worker', 'worker again') { - $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world'); + $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world'); my $i = 0; for my $fh ($ra, $rb, $rc) { my $buf = readline($fh); @@ -129,20 +118,34 @@ for my $t ('local', 'worker', 'worker again') { like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)"); $i++; } - $ipc->wq_do('test_die', $wa, $wb, $wc); + $ipc->wq_io_do('test_die', [ $wa, $wb, $wc ]); + $ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world'); + is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)"); + { + my $bigger = $big x 10; # to hit EMSGSIZE + $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger); + my $exp = sha1_hex($bigger)."\n"; + is(readline($rb), $exp, "SHA big for EMSGSIZE ($t)"); + + # to hit the WQWorker recv_and_run length + substr($bigger, my $MY_MAX_ARG_STRLEN = 4096 * 33, -1) = ''; + $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger); + $exp = sha1_hex($bigger)."\n"; + is(readline($rb), $exp, "SHA WQWorker limit ($t)"); + } my $ppid = $ipc->wq_workers_start('wq', 1); push(@ppids, $ppid); } -# wq_do works across fork (siblings can feed) +# wq_io_do works across fork (siblings can feed) SKIP: { - skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0]; + skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0]; is_deeply(\@ppids, [$$, undef, undef], 'parent pid returned in wq_workers_start'); my $pid = fork // BAIL_OUT $!; if ($pid == 0) { use POSIX qw(_exit); - $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$); + $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], $$); _exit(0); } else { my $i = 0; @@ -158,14 +161,32 @@ SKIP: { is(waitpid($pid, 0), $pid, 'waitpid complete'); is($?, 0, 'child wq producer exited'); } + my @ary = $ipc->wq_do('test_array'); + is_deeply(\@ary, [ qw(test array) ], 'wq_do wantarray'); + is(my $s = $ipc->wq_do('test_scalar'), 'scalar', 'defined wantarray'); + my $exp = bless ['blessed'], 'PublicInbox::WTF'; + my $ret = eval { $ipc->wq_do('test_die', $exp) }; + is_deeply($@, $exp, 'die with blessed ref'); } $ipc->wq_close; -seek($warn, 0, SEEK_SET) or BAIL_OUT; -my @warn = <$warn>; -is(scalar(@warn), 3, 'warned 3 times'); -like($warn[0], qr/ wq_do: /, '1st warned from wq_do'); -like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); -is($warn[2], $warn[1], 'worker did not die'); +SKIP: { + skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0]; + seek($warn, 0, SEEK_SET) or BAIL_OUT; + my @warn = <$warn>; + is(scalar(@warn), 3, 'warned 3 times'); + like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do'); + like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); + is($warn[2], $warn[1], 'worker did not die'); + + $SIG{__WARN__} = 'DEFAULT'; + is($ipc->wq_workers_start('wq', 2), $$, 'workers started again'); + $ipc->wq_broadcast('test_append_pid', "$tmpdir/append_pid"); + $ipc->wq_close; + open my $fh, '<', "$tmpdir/append_pid" or BAIL_OUT "open: $!"; + chomp(my @pids = <$fh>); + my %pids = map { $_ => 1 } grep(/\A[0-9]+\z/, @pids); + is(scalar keys %pids, 2, 'broadcast hit both PIDs'); +} done_testing;