]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ipc: start supporting sending/receiving more than 3 FDs
authorEric Wong <e@80x24.org>
Sun, 10 Jan 2021 12:15:12 +0000 (12:15 +0000)
committerEric Wong <e@80x24.org>
Tue, 12 Jan 2021 03:51:42 +0000 (03:51 +0000)
Actually, sending 4 FDs will be useful for lei internal xsearch
work once we start accepting input from stdin.  It won't be used
with the lightweight lei(1) client, however.

For WWW (eventually), a single FD may be enough.

lib/PublicInbox/CmdIPC1.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/IPC.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Spawn.pm
script/lei
t/cmd_ipc.t
t/ipc.t

index 0eed8bed05966a6cc6d7a5f1cde2b83eed590d0d..de6e54efdeb44c204c5169caf833cac38221a8b9 100644 (file)
@@ -10,17 +10,19 @@ BEGIN { eval {
 require IO::FDPass; # XS, available in all major distros
 no warnings 'once';
 
-*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
-       for (1..3) {
-               IO::FDPass::send(fileno($_[0]), $_[$_]) or
+*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
+       my ($sock, $fds, undef, $flags) = @_;
+       for my $fd (@$fds) {
+               IO::FDPass::send(fileno($sock), $fd) or
                                        die "IO::FDPass::send: $!";
        }
-       send($_[0], $_[4], $_[5]) or die "send $!";
+       send($sock, $_[2], $flags) or die "send $!";
 };
 
-*recv_cmd1 = sub ($$$) {
-       my ($s, undef, $len) = @_;
-       my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2);
+*recv_cmd1 = sub ($$$;$) {
+       my ($s, undef, $len, $nfds) = @_;
+       $nfds //= 3;
+       my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds);
        recv($s, $_[1], $len, 0) // die "recv: $!";
        length($_[1]) == 0 ? () : @fds;
 };
index 90fca62d0301648a17e664e0ba8213f0066b7230..c4fcb0d679d9256319ab6f5b45e86d72772256e4 100644 (file)
@@ -13,10 +13,12 @@ require Socket::MsgHdr; # XS
 no warnings 'once';
 
 # 3 FDs per-sendmsg(2) + buffer
-*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
-       my $mh = Socket::MsgHdr->new(buf => $_[4]);
-       $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3]));
-       Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!";
+*send_cmd4 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
+       my ($sock, $fds, undef, $flags) = @_;
+       my $mh = Socket::MsgHdr->new(buf => $_[2]);
+       $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS,
+                       pack('i' x scalar(@$fds), @$fds));
+       Socket::MsgHdr::sendmsg($sock, $mh, $flags) or die "sendmsg: $!";
 };
 
 *recv_cmd4 = sub ($$$) {
@@ -26,7 +28,7 @@ no warnings 'once';
        $_[1] = $mh->buf;
        return () if $r == 0;
        my (undef, undef, $data) = $mh->cmsghdr;
-       unpack('iii', $data);
+       unpack('i' x (length($data) / 4), $data);
 };
 
 } } # /eval /BEGIN
index be5b2f453f213a67351686315bf228d679d5dd2e..b0a0bfb5460f67962ea09ce5824c8916a34a3a06 100644 (file)
@@ -263,16 +263,15 @@ sub wq_worker_loop ($) {
 }
 
 sub wq_do { # always async
-       my ($self, $sub, $in, $out, $err, @args) = @_;
+       my ($self, $sub, $ios, @args) = @_;
        if (my $s1 = $self->{-wq_s1}) { # run in worker
-               $_ = fileno($_) for ($in, $out, $err);
-               $send_cmd->($s1, $in, $out, $err,
-                               freeze([$sub, @args]), MSG_EOR);
+               my $fds = [ map { fileno($_) } @$ios ];
+               $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR);
        } else {
-               @$self{0, 1, 2} = ($in, $out, $err);
+               @$self{0..$#$ios} = @$ios;
                eval { $self->$sub(@args) };
                warn "wq_do: $@" if $@;
-               delete @$self{0, 1, 2};
+               delete @$self{0..$#$ios};
        }
 }
 
@@ -334,7 +333,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
        my ($self) = @_;
        my $workers = $self->{-wq_workers} or return;
        my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
-       $self->wq_do('wq_exit', $s2, $s2, $s2);
+       $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
        $self->{-wq_exit_pending}++;
        # caller must call wq_worker_decr_wait in main loop
 }
index a3010efe6927f0bfe41de59301f652dbd5bdd410..c0df21a8155cb13193a0177b14a347a75e09b9e2 100644 (file)
@@ -150,14 +150,14 @@ sub do_query {
        $io[2]->autoflush(1);
        if ($lei->{opt}->{thread}) {
                for my $ibxish (@$srcs) {
-                       $self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+                       $self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
                }
        } else {
-               $self->wq_do('query_mset', @io, $lei, $srcs);
+               $self->wq_do('query_mset', \@io, $lei, $srcs);
        }
        # TODO
        for my $rmt (@{$self->{remotes} // []}) {
-               $self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+               $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
        }
 }
 
index 7d0d95977c813c503a0f5fe20ed9f46b819eac21..b35bf54c111265cc26e24fc1cd6c7d381816822b 100644 (file)
@@ -209,20 +209,22 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-struct my_3fds { int fds[3]; };
+#define SEND_FD_CAPA 3
+#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
        struct cmsghdr hdr;
-       char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8];
+       char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
 };
 
-int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
+int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
 {
        struct msghdr msg = { 0 };
-       struct iovec iov;
        union my_cmsg cmsg = { 0 };
-       int *fdp;
-       size_t i;
        STRLEN dlen = 0;
+       struct iovec iov;
+       AV *fds = (AV *)SvRV(svfds);
+       I32 i, nfds = av_len(fds) + 1;
+       int *fdp;
 
        if (SvOK(data)) {
                iov.iov_base = SvPV(data, dlen);
@@ -234,16 +236,22 @@ int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
        }
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
-       msg.msg_control = &cmsg.hdr;
-       msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
-
-       cmsg.hdr.cmsg_level = SOL_SOCKET;
-       cmsg.hdr.cmsg_type = SCM_RIGHTS;
-       cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds));
-       fdp = (int *)CMSG_DATA(&cmsg.hdr);
-       *fdp++ = in;
-       *fdp++ = out;
-       *fdp++ = err;
+       if (nfds) {
+               if (nfds > SEND_FD_CAPA) {
+                       fprintf(stderr, "FIXME: bump SEND_FD_CAPA=%d\n", nfds);
+                       nfds = SEND_FD_CAPA;
+               }
+               msg.msg_control = &cmsg.hdr;
+               msg.msg_controllen = CMSG_SPACE(nfds * sizeof(int));
+               cmsg.hdr.cmsg_level = SOL_SOCKET;
+               cmsg.hdr.cmsg_type = SCM_RIGHTS;
+               cmsg.hdr.cmsg_len = CMSG_LEN(nfds * sizeof(int));
+               fdp = (int *)CMSG_DATA(&cmsg.hdr);
+               for (i = 0; i < nfds; i++) {
+                       SV **fd = av_fetch(fds, i, 0);
+                       *fdp++ = SvIV(*fd);
+               }
+       }
        return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
 }
 
@@ -263,17 +271,17 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        msg.msg_control = &cmsg.hdr;
-       msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
+       msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
 
        i = recvmsg(PerlIO_fileno(s), &msg, 0);
        if (i < 0)
                croak("recvmsg: %s", strerror(errno));
        SvCUR_set(buf, i);
        if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
-                       cmsg.hdr.cmsg_type == SCM_RIGHTS &&
-                       cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) {
+                       cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+               size_t len = cmsg.hdr.cmsg_len;
                int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-               for (i = 0; i < 3; i++)
+               for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
                        Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
        }
        Inline_Stack_Done;
index d954b9eb4cfa72dc96b5a945e720c0e285ddc163..5e30f4d7dec277d2ac7d0f0ccff12670126a6d66 100755 (executable)
@@ -67,7 +67,7 @@ Falling back to (slow) one-shot mode
        $buf .= "\0\0";
        select $sock;
        $| = 1; # unbuffer selected $sock
-       $send_cmd->($sock, 0, 1, 2, $buf, 0);
+       $send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
        while ($buf = <$sock>) {
                $buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
                die $buf;
index b9f4d128138c3cdf60cb269df8ecff8eb7c775bb..22f73c1952b3c47288a744efea70402ad626f405 100644 (file)
@@ -17,7 +17,8 @@ my $do_test = sub { SKIP: {
        my ($s1, $s2);
        my $src = 'some payload' x 40;
        socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
-       $send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+       my $sfds = [ fileno($r), fileno($w), fileno($s1) ];
+       $send->($s1, $sfds, $src, $flag);
        my (@fds) = $recv->($s2, my $buf, length($src) + 1);
        is($buf, $src, 'got buffer payload '.$desc);
        my ($r1, $w1, $s1a);
@@ -39,7 +40,7 @@ my $do_test = sub { SKIP: {
        if (defined($SOCK_SEQPACKET) && $type == $SOCK_SEQPACKET) {
                $r1 = $w1 = $s1a = undef;
                $src = (',' x 1023) . '-' .('.' x 1024);
-               $send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+               $send->($s1, $sfds, $src, $flag);
                (@fds) = $recv->($s2, $buf, 1024);
                is($buf, (',' x 1023) . '-', 'silently truncated buf');
                $opens->();
diff --git a/t/ipc.t b/t/ipc.t
index 903294c5b61336890c18768077a530ddc7736981..d2b6ad4f4b17f376e97d7f9c5c8456c2c3371513 100644 (file)
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -121,7 +121,7 @@ $warn->autoflush(0);
 local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ };
 my @ppids;
 for my $t ('local', 'worker', 'worker again') {
-       $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world');
+       $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
        my $i = 0;
        for my $fh ($ra, $rb, $rc) {
                my $buf = readline($fh);
@@ -129,7 +129,7 @@ for my $t ('local', 'worker', 'worker again') {
                like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
                $i++;
        }
-       $ipc->wq_do('test_die', $wa, $wb, $wc);
+       $ipc->wq_do('test_die', [ $wa, $wb, $wc ]);
        my $ppid = $ipc->wq_workers_start('wq', 1);
        push(@ppids, $ppid);
 }
@@ -142,7 +142,7 @@ SKIP: {
        my $pid = fork // BAIL_OUT $!;
        if ($pid == 0) {
                use POSIX qw(_exit);
-               $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$);
+               $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], $$);
                _exit(0);
        } else {
                my $i = 0;