]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei_xsearch: transfer 4 FDs internally, drop IO::FDPass
authorEric Wong <e@80x24.org>
Sun, 10 Jan 2021 12:15:18 +0000 (12:15 +0000)
committerEric Wong <e@80x24.org>
Tue, 12 Jan 2021 03:51:43 +0000 (03:51 +0000)
It's easier to make the code more generic by transferring
all four FDs (std(in|out|err) + socket) instead of omitting
stdin.

We'll be reading from stdin on some imports, and possibly
outputting to stdout, so omitting stdin now would needlessly
complicate things.

The differences with IO::FDPass "1" code paths and the "4"
code paths used by Inline::C and Socket::MsgHdr are far too
much to support and test at the moment.

MANIFEST
lib/PublicInbox/CmdIPC1.pm [deleted file]
lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Spawn.pm
script/lei
t/cmd_ipc.t
t/ipc.t
t/lei.t

index 62c14cd25b708e9ca069dc73836c57e712fdcd16..caddd8df650e6425391f91b9734c283d5d973edd 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -109,7 +109,6 @@ lib/PublicInbox/Admin.pm
 lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
 lib/PublicInbox/Cgit.pm
-lib/PublicInbox/CmdIPC1.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CompressNoop.pm
 lib/PublicInbox/Config.pm
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
deleted file mode 100644 (file)
index de6e54e..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# callers should use PublicInbox::CmdIPC1->can('send_cmd1') (or recv_cmd1)
-# 2nd choice for lei(1) front-end and 3rd choice for lei internals
-package PublicInbox::CmdIPC1;
-use strict;
-use v5.10.1;
-BEGIN { eval {
-require IO::FDPass; # XS, available in all major distros
-no warnings 'once';
-
-*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
-       my ($sock, $fds, undef, $flags) = @_;
-       for my $fd (@$fds) {
-               IO::FDPass::send(fileno($sock), $fd) or
-                                       die "IO::FDPass::send: $!";
-       }
-       send($sock, $_[2], $flags) or die "send $!";
-};
-
-*recv_cmd1 = sub ($$$;$) {
-       my ($s, undef, $len, $nfds) = @_;
-       $nfds //= 3;
-       my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds);
-       recv($s, $_[1], $len, 0) // die "recv: $!";
-       length($_[1]) == 0 ? () : @fds;
-};
-
-} } # /eval /BEGIN
-
-1;
index 88f81e477ec07d58c97f80f8ccb091d88f7f6d2d..c54fcc64481966fd933a63ed81a04e09d836a652 100644 (file)
@@ -37,37 +37,16 @@ if ($enc && $dec) { # should be custom ops
        } // warn("Storable (part of Perl) missing: $@\n");
 }
 
-my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1;
 my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
 my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
        require PublicInbox::CmdIPC4;
        $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
        PublicInbox::CmdIPC4->can('send_cmd4');
-} // do {
-       # IO::FDPass only allows sending a single FD at-a-time, which
-       # means we can't guarantee all packets end up on the same worker,
-       # so we cap WQ_MAX_WORKERS
-       require PublicInbox::CmdIPC1;
-       $recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1');
-       $WQ_MAX_WORKERS = 1 if $recv_cmd1;
-       wq_set_recv_fds(3);
-       PublicInbox::CmdIPC1->can('send_cmd1');
 };
 
-# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv
-sub wq_set_recv_fds {
-       return unless $recv_cmd1;
-       my $nfds = pop;
-       my $sub = sub {
-               my ($sock, $fds, undef, $flags) = @_;
-               $recv_cmd1->($sock, $fds, $_[2], $flags, $nfds);
-       };
-       my $self = pop;
-       if (ref $self) {
-               $self->{-wq_recv_cmd} = $sub;
-       } else {
-               $recv_cmd = $sub;
-       }
+sub wq_set_recv_modes {
+       my ($self, @modes) = @_;
+       $self->{-wq_recv_modes} = \@modes;
 }
 
 sub _get_rec ($) {
@@ -259,7 +238,9 @@ sub ipc_sibling_atfork_child {
 
 sub _close_recvd ($) {
        my ($self) = @_;
-       close($_) for (grep { defined } (delete @$self{0..2}));
+       my $x = $self->{-wq_recv_modes};
+       my $end = $x ? $#$x : 2;
+       close($_) for (grep { defined } (delete @$self{0..$end}));
 }
 
 sub wq_worker_loop ($) {
@@ -271,13 +252,12 @@ sub wq_worker_loop ($) {
        local $SIG{PIPE} = sub {
                my $cur_sub = $sub;
                _close_recvd($self);
-               die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub;
+               die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
        };
-       my $rcv = $self->{-wq_recv_cmd} // $recv_cmd;
        while (1) {
-               my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF
+               my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
                my $i = 0;
-               my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
+               my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
                for my $fd (@fds) {
                        my $mode = shift(@m);
                        if (open(my $cmdfh, $mode, $fd)) {
@@ -296,7 +276,8 @@ sub wq_worker_loop ($) {
                        undef $sub; # quiet SIG{PIPE} handler
                        die $@ if $@;
                };
-               warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+               warn "$$ wq_worker: $@" if $@ &&
+                                       ref($@) ne 'PublicInbox::SIGPIPE';
                # need to close explicitly to avoid warnings after SIGPIPE
                _close_recvd($self);
        }
@@ -310,8 +291,8 @@ sub wq_do { # always async
        } else {
                @$self{0..$#$ios} = @$ios;
                eval { $self->$sub(@args) };
-               warn "wq_do: $@" if $@;
-               delete @$self{0..$#$ios};
+               warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+               delete @$self{0..$#$ios}; # don't close
        }
 }
 
index d19fb311fd8613d64d8690fbe381c7a3d8b5567d..7313738eb3f52aea3e4f2c3740e4c27f400285bd 100644 (file)
@@ -277,8 +277,9 @@ sub atfork_prepare_wq {
 # usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
 sub atfork_child_wq {
        my ($self, $wq) = @_;
-       $self->{sock} //= $wq->{0};
-       $self->{$_} //= $wq->{$_} for (0..2);
+       return () if $self->{0}; # did not fork
+       $self->{$_} = $wq->{$_} for (0..2);
+       $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef
        my $oldpipe = $SIG{PIPE};
        %PATH2CFG = ();
        @TO_CLOSE_ATFORK_CHILD = ();
@@ -298,11 +299,10 @@ sub atfork_parent_wq {
                my $env = delete $self->{env}; # env is inherited at fork
                my $ret = bless { %$self }, ref($self);
                $self->{env} = $env;
-               delete @$ret{qw(-lei_store cfg)};
-               my $in = delete $ret->{0};
-               ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+               delete @$ret{qw(-lei_store cfg pgr)};
+               ($ret, delete @$ret{qw(0 1 2 sock)});
        } else {
-               ($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+               ($self, @$self{qw(0 1 2 sock)});
        }
 }
 
@@ -641,7 +641,7 @@ sub start_pager {
        $new_env{MORE} = 'FRX' if $^O eq 'freebsd';
        pipe(my ($r, $wpager)) or return warn "pipe: $!";
        my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
-       my $pid;
+       my $pgr = [ undef, @$rdr{1, 2} ];
        if (my $sock = $self->{sock}) { # lei(1) process runs it
                delete @new_env{keys %$env}; # only set iff unset
                my $buf = "exec 1\0".$pager;
@@ -649,12 +649,23 @@ sub start_pager {
                my $fds = [ map { fileno($_) } @$rdr{0..2} ];
                $send_cmd->($sock, $fds, $buf .= "\n", 0);
        } else {
-               $pid = spawn([$pager], $env, $rdr);
+               $pgr->[0] = spawn([$pager], $env, $rdr);
        }
        $self->{1} = $wpager;
        $self->{2} = $wpager if -t $self->{2};
        $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
-       [ $pid, @$rdr{1, 2} ];
+       $self->{pgr} = $pgr;
+}
+
+sub stop_pager {
+       my ($self) = @_;
+       my $pgr = delete($self->{pgr}) or return;
+       my $pid = $pgr->[0];
+       close $self->{1};
+       # {2} may not be redirected
+       $self->{1} = $pgr->[1];
+       $self->{2} = $pgr->[2];
+       dwaitpid($pid, undef, $self->{sock}) if $pid;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -738,11 +749,7 @@ sub lazy_start {
        my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
        pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
        local $oldset = PublicInbox::DS::block_signals();
-       if ($nfd == 1) {
-               require PublicInbox::CmdIPC1;
-               $send_cmd = PublicInbox::CmdIPC1->can('send_cmd1');
-               $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
-       } elsif ($nfd == 4) {
+       if ($nfd == 4) {
                $send_cmd = PublicInbox::Spawn->can('send_cmd4');
                $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
                        require PublicInbox::CmdIPC4;
@@ -751,7 +758,7 @@ sub lazy_start {
                };
        }
        $recv_cmd or die <<"";
-(Socket::MsgHdr || IO::FDPass || Inline::C) missing/unconfigured (nfd=$nfd);
+(Socket::MsgHdr || Inline::C) missing/unconfigured (nfd=$nfd);
 
        require PublicInbox::Listener;
        require PublicInbox::EOFpipe;
@@ -839,19 +846,24 @@ sub lazy_start {
        exit($exit_code // 0);
 }
 
-# for users w/o IO::FDPass
+# for users w/o Socket::Msghdr
 sub oneshot {
        my ($main_pkg) = @_;
        my $exit = $main_pkg->can('exit'); # caller may override exit()
        local $quit = $exit if $exit;
        local %PATH2CFG;
        umask(077) // die("umask(077): $!");
-       dispatch((bless {
-               0 => *STDIN{GLOB},
-               1 => *STDOUT{GLOB},
-               2 => *STDERR{GLOB},
-               env => \%ENV
-       }, __PACKAGE__), @ARGV);
+       local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) };
+       eval {
+               my $self = bless {
+                       0 => *STDIN{GLOB},
+                       1 => *STDOUT{GLOB},
+                       2 => *STDERR{GLOB},
+                       env => \%ENV
+               }, __PACKAGE__;
+               dispatch($self, @ARGV);
+       };
+       die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
 }
 
 # ensures stdout hits the FS before sock disconnects so a client
@@ -859,6 +871,7 @@ sub oneshot {
 sub DESTROY {
        my ($self) = @_;
        $self->{1}->autoflush(1);
+       stop_pager($self);
 }
 
 1;
index 6e7787853c1aca928bf5098ef9d0cc157c411104..2f4b99e58a4d2f1004eb014bbf77e02a927a9429 100644 (file)
@@ -80,14 +80,14 @@ sub lei_q {
        if ($self->{sock}) {
                $self->atfork_prepare_wq($lxs);
                $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
-                       // $self->wq_workers($j);
+                       // $lxs->wq_workers($j);
        }
        unshift(@srcs, $sto->search) if $opt->{'local'};
        my $out = $opt->{output} // '-';
        $out = 'json:/dev/stdout' if $out eq '-';
        my $isatty = -t $self->{1};
        # no forking workers after this
-       my $pid_old12 = $self->start_pager if $isatty;
+       $self->start_pager if $isatty;
        my $json = substr($out, 0, 5) eq 'json:' ?
                ref(PublicInbox::Config->json)->new : undef;
        if ($json) {
@@ -125,11 +125,6 @@ sub lei_q {
        # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
        $self->{mset_opt} = \%mset_opt;
        $lxs->do_query($self, \@srcs);
-       if ($pid_old12) { # [ pid, stdout, stderr ]
-               my $pid = $pid_old12->[0];
-               $self->{$_} = $pid_old12->[$_] for (1, 2);
-               dwaitpid($pid, undef, $self->{sock}) if $pid;
-       }
 }
 
 1;
index b41727342de7c1e65a95c360ec0bf300c27f705f..94f7c2bcb90832a7e1d4e6f96575a72008ea91dd 100644 (file)
@@ -168,4 +168,10 @@ sub ipc_atfork_child {
        $self->SUPER::ipc_atfork_child; # PublicInbox::IPC
 }
 
+sub ipc_atfork_prepare {
+       my ($self) = @_;
+       $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]);
+       $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
 1;
index b35bf54c111265cc26e24fc1cd6c7d381816822b..ef822e1bfb0fd45b7a2b2bf56c6057963ecd687f 100644 (file)
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 3
+#define SEND_FD_CAPA 4
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
        struct cmsghdr hdr;
index aac8fa94980e4500eba6a5eaf0cee4e87b1a5a1b..5c32ab88d07ff7ce39d3b5b5cba7f59467d09e34 100755 (executable)
@@ -8,11 +8,6 @@ use PublicInbox::CmdIPC4;
 my $narg = 4;
 my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
 my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
-       require PublicInbox::CmdIPC1; # 2nd choice
-       $narg = 1;
-       $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
-       PublicInbox::CmdIPC1->can('send_cmd1');
-} // do {
        require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
        $narg = 4;
        $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
@@ -77,7 +72,7 @@ Falling back to (slow) one-shot mode
                $pwd = $cwd;
        }
        1;
-}) { # (Socket::MsgHdr|IO::FDPass|Inline::C), $sock, $pwd are all available:
+}) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available:
        local $ENV{PWD} = $pwd;
        my $buf = join("\0", scalar(@ARGV), @ARGV);
        while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
@@ -94,7 +89,7 @@ Falling back to (slow) one-shot mode
                        die $buf;
                }
        }
-} else { # for systems lacking Socket::MsgHdr, IO::FDPass or Inline::C
+} else { # for systems lacking Socket::MsgHdr or Inline::C
        warn $@ if $@;
        require PublicInbox::LEI;
        PublicInbox::LEI::oneshot(__PACKAGE__);
index 22f73c1952b3c47288a744efea70402ad626f405..0a0a4e002a183776cf96ffb4dfd8c245c5c2828e 100644 (file)
@@ -79,13 +79,4 @@ SKIP: {
        }
 }
 
-SKIP: {
-       require_mods('IO::FDPass', 13);
-       require_ok 'PublicInbox::CmdIPC1';
-       $send = PublicInbox::CmdIPC1->can('send_cmd1');
-       $recv = PublicInbox::CmdIPC1->can('recv_cmd1');
-       $do_test->(SOCK_STREAM, 0, 'IO::FDPass stream');
-       $do_test->($SOCK_SEQPACKET, MSG_EOR, 'IO::FDPass seqpacket');
-}
-
 done_testing;
diff --git a/t/ipc.t b/t/ipc.t
index fd2908095553b57bd33e9f9690f738d18e5449f2..22423a78a014dc806fc2eddbc520f72ab2badc22 100644 (file)
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -6,6 +6,7 @@ use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
 use Fcntl qw(SEEK_SET);
+require_mods(qw(Storable||Sereal));
 require_ok 'PublicInbox::IPC';
 state $once = eval <<'';
 package PublicInbox::IPC;
@@ -94,8 +95,7 @@ my $test = sub {
 };
 $test->('local');
 
-SKIP: {
-       require_mods(qw(Storable||Sereal), 16);
+{
        my $pid = $ipc->ipc_worker_spawn('test worker');
        ok($pid > 0 && kill(0, $pid), 'worker spawned and running');
        defined($pid) or BAIL_OUT 'no spawn, no test';
@@ -112,7 +112,7 @@ SKIP: {
 $ipc->ipc_worker_stop; # idempotent
 
 # work queues
-$ipc->{wq_open_modes} = [qw( >&= >&= >&= )];
+$ipc->wq_set_recv_modes(qw( >&= >&= >&= ));
 pipe(my ($ra, $wa)) or BAIL_OUT $!;
 pipe(my ($rb, $wb)) or BAIL_OUT $!;
 pipe(my ($rc, $wc)) or BAIL_OUT $!;
@@ -136,7 +136,7 @@ for my $t ('local', 'worker', 'worker again') {
 
 # wq_do works across fork (siblings can feed)
 SKIP: {
-       skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0];
+       skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
        is_deeply(\@ppids, [$$, undef, undef],
                'parent pid returned in wq_workers_start');
        my $pid = fork // BAIL_OUT $!;
@@ -161,28 +161,31 @@ SKIP: {
 }
 
 $ipc->wq_close;
-seek($warn, 0, SEEK_SET) or BAIL_OUT;
-my @warn = <$warn>;
-is(scalar(@warn), 3, 'warned 3 times');
-like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
-like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
-is($warn[2], $warn[1], 'worker did not die');
-
-$SIG{__WARN__} = 'DEFAULT';
-is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
-is($ipc->wq_workers, 1, '1 worker started');
 SKIP: {
-       $ipc->WQ_MAX_WORKERS > 1 or
-               skip 'Inline::C or Socket::MsgHdr not available', 4;
-       $ipc->wq_worker_incr;
-       is($ipc->wq_workers, 2, 'worker count bumped');
-       $ipc->wq_worker_decr;
-       $ipc->wq_worker_decr_wait(10);
-       is($ipc->wq_workers, 1, 'worker count lowered');
-       is($ipc->wq_workers(2), 2, 'worker count set');
-       is($ipc->wq_workers, 2, 'worker count stayed set');
+       skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
+       seek($warn, 0, SEEK_SET) or BAIL_OUT;
+       my @warn = <$warn>;
+       is(scalar(@warn), 3, 'warned 3 times');
+       like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
+       like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
+       is($warn[2], $warn[1], 'worker did not die');
+
+       $SIG{__WARN__} = 'DEFAULT';
+       is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
+       is($ipc->wq_workers, 1, '1 worker started');
+       SKIP: {
+               $ipc->WQ_MAX_WORKERS > 1 or
+                       skip 'Inline::C or Socket::MsgHdr not available', 4;
+               $ipc->wq_worker_incr;
+               is($ipc->wq_workers, 2, 'worker count bumped');
+               $ipc->wq_worker_decr;
+               $ipc->wq_worker_decr_wait(10);
+               is($ipc->wq_workers, 1, 'worker count lowered');
+               is($ipc->wq_workers(2), 2, 'worker count set');
+               is($ipc->wq_workers, 2, 'worker count stayed set');
+       }
+       $ipc->wq_close;
+       is($ipc->wq_workers, undef, 'workers undef after close');
 }
-$ipc->wq_close;
-is($ipc->wq_workers, undef, 'workers undef after close');
 
 done_testing;
diff --git a/t/lei.t b/t/lei.t
index 992800a515d757c9330864edf034195f9c023140..6819f1829a24a67bf9e628ceef9ce6d445bbecdb 100644 (file)
--- a/t/lei.t
+++ b/t/lei.t
@@ -208,13 +208,11 @@ if ($ENV{TEST_LEI_ONESHOT}) {
 
 SKIP: { # real socket
        require_mods(qw(Cwd), my $nr = 105);
-       my $nfd = eval { require Socket::MsgHdr; 4 } //
-                       eval { require IO::FDPass; 1 } // do {
+       my $nfd = eval { require Socket::MsgHdr; 4 } // do {
                require PublicInbox::Spawn;
                PublicInbox::Spawn->can('send_cmd4') ? 4 : undef;
        } //
-       skip 'Socket::MsgHdr, IO::FDPass or Inline::C missing or unconfigured',
-               $nr;
+       skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr;
 
        local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
        my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";