]> Sergey Matveev's repositories - public-inbox.git/commitdiff
cmd_ipc: support + test EINTR + EAGAIN, no FDs
authorEric Wong <e@80x24.org>
Thu, 14 Jan 2021 07:06:14 +0000 (19:06 -1200)
committerEric Wong <e@80x24.org>
Thu, 14 Jan 2021 23:14:08 +0000 (23:14 +0000)
We'll ensure our {send,recv}_cmd4 implementations are
consistent w.r.t. non-blocking and interrupted sockets.

We'll also support receiving messages without FDs associated
so we don't have to send dummy FDs to keep receivers from
reporting EOF.

lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/Spawn.pm
t/cmd_ipc.t

index c4fcb0d679d9256319ab6f5b45e86d72772256e4..c244f6a1a7c7c2d177498c2f275cda55e9cb7842 100644 (file)
@@ -18,17 +18,17 @@ no warnings 'once';
        my $mh = Socket::MsgHdr->new(buf => $_[2]);
        $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS,
                        pack('i' x scalar(@$fds), @$fds));
-       Socket::MsgHdr::sendmsg($sock, $mh, $flags) or die "sendmsg: $!";
+       Socket::MsgHdr::sendmsg($sock, $mh, $flags);
 };
 
 *recv_cmd4 = sub ($$$) {
        my ($s, undef, $len) = @_; # $_[1] = destination buffer
        my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
-       my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // die "recvmsg: $!";
+       my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // return ($_[1] = undef);
        $_[1] = $mh->buf;
        return () if $r == 0;
        my (undef, undef, $data) = $mh->cmsghdr;
-       unpack('i' x (length($data) / 4), $data);
+       defined($data) ? unpack('i' x (length($data) / 4), $data) : ();
 };
 
 } } # /eval /BEGIN
index ef822e1bfb0fd45b7a2b2bf56c6057963ecd687f..e5c0b1e94d03079455ce6ecea8fd977c468c4998 100644 (file)
@@ -216,12 +216,13 @@ union my_cmsg {
        char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
 };
 
-int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
+SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
 {
        struct msghdr msg = { 0 };
        union my_cmsg cmsg = { 0 };
        STRLEN dlen = 0;
        struct iovec iov;
+       ssize_t sent;
        AV *fds = (AV *)SvRV(svfds);
        I32 i, nfds = av_len(fds) + 1;
        int *fdp;
@@ -252,7 +253,8 @@ int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
                        *fdp++ = SvIV(*fd);
                }
        }
-       return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
+       sent = sendmsg(PerlIO_fileno(s), &msg, flags);
+       return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
 }
 
 void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
@@ -260,7 +262,7 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
        union my_cmsg cmsg = { 0 };
        struct msghdr msg = { 0 };
        struct iovec iov;
-       size_t i;
+       ssize_t i;
        Inline_Stack_Vars;
        Inline_Stack_Reset;
 
@@ -275,8 +277,9 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
 
        i = recvmsg(PerlIO_fileno(s), &msg, 0);
        if (i < 0)
-               croak("recvmsg: %s", strerror(errno));
-       SvCUR_set(buf, i);
+               Inline_Stack_Push(&PL_sv_undef);
+       else
+               SvCUR_set(buf, i);
        if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
                        cmsg.hdr.cmsg_type == SCM_RIGHTS) {
                size_t len = cmsg.hdr.cmsg_len;
index 0a0a4e002a183776cf96ffb4dfd8c245c5c2828e..96510175263b4680cacdd2960a92723a64cd49b0 100644 (file)
@@ -10,6 +10,7 @@ pipe(my ($r, $w)) or BAIL_OUT;
 my ($send, $recv);
 require_ok 'PublicInbox::Spawn';
 my $SOCK_SEQPACKET = eval { Socket::SOCK_SEQPACKET() } // undef;
+use Time::HiRes qw(alarm);
 
 my $do_test = sub { SKIP: {
        my ($type, $flag, $desc) = @_;
@@ -45,11 +46,42 @@ my $do_test = sub { SKIP: {
                is($buf, (',' x 1023) . '-', 'silently truncated buf');
                $opens->();
                $r1 = $w1 = $s1a = undef;
+
+               $s2->blocking(0);
+               @fds = $recv->($s2, $buf, length($src) + 1);
+               ok($!{EAGAIN}, "EAGAIN set by ($desc)");
+               is_deeply(\@fds, [ undef ], "EAGAIN $desc");
+               $s2->blocking(1);
+
+               my $alrm = 0;
+               local $SIG{ALRM} = sub { $alrm++ };
+               alarm(0.001);
+               @fds = $recv->($s2, $buf, length($src) + 1);
+               ok($!{EINTR}, "EINTR set by ($desc)");
+               is_deeply(\@fds, [ undef ], "EINTR $desc");
+               is($alrm, 1, 'SIGALRM hit');
+
                close $s1;
                @fds = $recv->($s2, $buf, length($src) + 1);
                is_deeply(\@fds, [], "no FDs on EOF $desc");
                is($buf, '', "buffer cleared on EOF ($desc)");
 
+               socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
+               $s1->blocking(0);
+               my $nsent = 0;
+               while (defined(my $n = $send->($s1, $sfds, $src, $flag))) {
+                       $nsent += $n;
+                       fail "sent 0 bytes" if $n == 0;
+               }
+               ok($!{EAGAIN}, "hit EAGAIN on send $desc");
+               ok($nsent > 0, 'sent some bytes');
+
+               socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
+               is($send->($s1, [], $src, $flag), length($src), 'sent w/o FDs');
+               $buf = 'nope';
+               @fds = $recv->($s2, $buf, length($src));
+               is(scalar(@fds), 0, 'no FDs received');
+               is($buf, $src, 'recv w/o FDs');
        }
 } };