]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: less error-prone FD mapping
authorEric Wong <e@80x24.org>
Fri, 29 Jan 2021 07:42:57 +0000 (12:42 +0500)
committerEric Wong <e@80x24.org>
Sat, 30 Jan 2021 01:08:19 +0000 (01:08 +0000)
Keeping track of non-standard FDs gets tricky, so make it easier
by relying on st_dev/st_ino mapping in the transmitted objects.

We'll keep using numbers for the standard FDs since we need to
be able to easily redirect them in the producer (main daemon)
process for (gzip|bzip2|xz) if writing to a compressed mbox.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Spawn.pm

index f5413aab51c3c01257ee7a46dd8662181500408f..3ed330f99315f74f351b84a07a5ee8069534046b 100644 (file)
@@ -335,14 +335,27 @@ sub atfork_prepare_wq {
        }
 }
 
+sub io_restore ($$) {
+       my ($dst, $src) = @_;
+       for my $i (0..2) { # standard FDs
+               my $io = delete $src->{$i} or next;
+               $dst->{$i} = $io;
+       }
+       for my $i (3..9) { # named (non-standard) FDs
+               my $io = $src->{$i} or next;
+               my @st = stat($io) or die "stat $src.$i ($io): $!";
+               my $f = delete $dst->{"dev=$st[0],ino=$st[1]"} // next;
+               $dst->{$f} = $io;
+               delete $src->{$i};
+       }
+}
+
 # usage: my %sig = $lei->atfork_child_wq($wq);
 #       local @SIG{keys %sig} = values %sig;
 sub atfork_child_wq {
        my ($self, $wq) = @_;
-       my ($sock, $l2m_wq_s1);
-       (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
-       $self->{sock} = $sock if -S $sock;
-       $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1;
+       io_restore($self, $wq);
+       io_restore($self->{l2m}, $wq);
        %PATH2CFG = ();
        undef $errors_log;
        $quit = \&CORE::exit;
@@ -355,30 +368,45 @@ sub atfork_child_wq {
                        close(delete $self->{$i});
                }
                # trigger the LeiXSearch $done OpPipe:
-               syswrite($self->{0}, '!') if $self->{0} && -p $self->{0};
+               syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
                $SIG{PIPE} = 'DEFAULT';
                die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
        });
 }
 
+sub io_extract ($;@) {
+       my ($obj, @fields) = @_;
+       my @io;
+       for my $f (@fields) {
+               my $io = delete $obj->{$f} or next;
+               my @st = stat($io) or die "W: stat $obj.$f ($io): $!";
+               $obj->{"dev=$st[0],ino=$st[1]"} = $f;
+               push @io, $io;
+       }
+       @io
+}
+
 # usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
 sub atfork_parent_wq {
        my ($self, $wq) = @_;
        my $env = delete $self->{env}; # env is inherited at fork
-       my $ret = bless { %$self }, ref($self);
-       if (my $dedupe = delete $ret->{dedupe}) {
-               $ret->{dedupe} = $wq->deep_clone($dedupe);
+       my $lei = bless { %$self }, ref($self);
+       if (my $dedupe = delete $lei->{dedupe}) {
+               $lei->{dedupe} = $wq->deep_clone($dedupe);
        }
        $self->{env} = $env;
-       delete @$ret{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
-       my @io = delete @$ret{0..2};
-       $io[3] = delete($ret->{sock}) // $io[2];
-       my $l2m = $ret->{l2m};
+       delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
+       my @io = (delete(@$lei{qw(0 1 2)}),
+                       io_extract($lei, qw(sock op_pipe startq)));
+       my $l2m = $lei->{l2m};
        if ($l2m && $l2m != $wq) { # $wq == lxs
-               $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
+               if (my $wq_s1 = $l2m->{-wq_s1}) {
+                       push @io, io_extract($l2m, '-wq_s1');
+                       $l2m->{-wq_s1} = $wq_s1;
+               }
                $l2m->wq_close(1);
        }
-       ($ret, @io);
+       ($lei, @io);
 }
 
 sub _help ($;$) {
index f9a281387e6aea5738e88db7fe7f748e4bceb6fb..c67e2747edfac4faeb1613abcca10a8263a73775 100644 (file)
@@ -220,14 +220,13 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                };
        } elsif ($l2m && $l2m->{-wq_s1}) {
                my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
-               # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout,
-               # $io[4] becomes a notification pipe that triggers EOF
+               # $io[-1] becomes a notification pipe that triggers EOF
                # in this wq worker when all outstanding ->write_mail
                # calls are complete
-               die "BUG: \$io[4] $io[4] unexpected" if $io[4];
-               pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!";
-               fcntl($io[4], 1031, 4096) if $^O eq 'linux';
+               pipe($l2m->{each_smsg_done}, $io[$#io + 1]) or die "pipe: $!";
+               fcntl($io[-1], 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
                delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
+               $lei_ipc->{each_smsg_not_done} = $#io;
                my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
                $self->{git} = $git;
                my $git_dir = $git->{git_dir};
index 08a1570d21c71f20dfc8ae8c698b3b7561d6cc9e..61b546b5ec8ae13892ba5070666353b845eaba62 100644 (file)
@@ -460,7 +460,7 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
 
 sub write_mail { # via ->wq_do
        my ($self, $git_dir, $smsg, $lei) = @_;
-       my $not_done = delete $self->{4}; # write end of {each_smsg_done}
+       my $not_done = delete $self->{$lei->{each_smsg_not_done}};
        my $wcb = $self->{wcb} //= do { # first message
                my %sig = $lei->atfork_child_wq($self);
                @SIG{keys %sig} = values %sig; # not local
@@ -471,12 +471,6 @@ sub write_mail { # via ->wq_do
        $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
 }
 
-sub ipc_atfork_prepare {
-       my ($self) = @_;
-       # FDs: (done_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
-       $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
-}
-
 # We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
 # ordering is unstable at worker exit and may cause segfaults
 sub reap_gits {
index 9ea2b5f339066ad4e4425e77a94f5ac4bbb67bc6..e69b637c7a29d1ef0da2e4c86780c5d4ba112c21 100644 (file)
@@ -109,9 +109,9 @@ sub wait_startq ($) {
 sub query_thread_mset { # for --thread
        my ($self, $lei, $ibxish) = @_;
        local $0 = "$0 query_thread_mset";
-       my $startq = delete $self->{5};
        my %sig = $lei->atfork_child_wq($self);
        local @SIG{keys %sig} = values %sig;
+       my $startq = delete $lei->{startq};
 
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
        unless ($srch && $over) {
@@ -145,9 +145,9 @@ sub query_thread_mset { # for --thread
 sub query_mset { # non-parallel for non-"--thread" users
        my ($self, $lei) = @_;
        local $0 = "$0 query_mset";
-       my $startq = delete $self->{5};
        my %sig = $lei->atfork_child_wq($self);
        local @SIG{keys %sig} = values %sig;
+       my $startq = delete $lei->{startq};
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
        for my $loc (locals($self)) {
@@ -173,7 +173,7 @@ sub each_eml { # callback for MboxReader->mboxrd
        $smsg->parse_references($eml, mids($eml));
        $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
        delete @$smsg{qw(From Subject -ds -ts)};
-       if (my $startq = delete($self->{5})) { wait_startq($startq) }
+       if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
        $each_smsg->($smsg, undef, $eml);
 }
 
@@ -352,11 +352,12 @@ sub query_prepare { # called by wq_do
        my ($self, $lei) = @_;
        local $0 = "$0 query_prepare";
        my %sig = $lei->atfork_child_wq($self);
-       -p $lei->{0} or die "BUG: \$done pipe expected";
+       -p $lei->{op_pipe} or die "BUG: \$done pipe expected";
        local @SIG{keys %sig} = values %sig;
+       delete $lei->{l2m}->{-wq_s1};
        eval { $lei->{l2m}->do_augment($lei) };
        $lei->fail($@) if $@;
-       syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
+       syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
 sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
@@ -370,56 +371,45 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 }
 
 sub do_query {
-       my ($self, $lei_orig) = @_;
-       my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-       $io[0] = undef;
-       pipe(my $done, $io[0]) or die "pipe $!";
-       $lei_orig->{1}->autoflush(1);
+       my ($self, $lei) = @_;
+       $lei->{1}->autoflush(1);
+       my ($au_done, $zpipe);
+       my $l2m = $lei->{l2m};
+       if ($l2m) {
+               pipe($lei->{startq}, $au_done) or die "pipe: $!";
+               # 1031: F_SETPIPE_SZ
+               fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+               $zpipe = $l2m->pre_augment($lei);
+       }
+       pipe(my $done, $lei->{op_pipe}) or die "pipe $!";
+       my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
+       delete($lei->{op_pipe});
 
-       $lei_orig->event_step_init; # wait for shutdowns
+       $lei->event_step_init; # wait for shutdowns
        my $done_op = {
-               '' => [ \&query_done, $lei_orig ],
-               '!' => [ \&sigpipe_handler, $lei_orig ]
+               '' => [ \&query_done, $lei ],
+               '!' => [ \&sigpipe_handler, $lei ]
        };
-       my $in_loop = exists $lei_orig->{sock};
+       my $in_loop = exists $lei->{sock};
        $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
-       my $l2m = $lei->{l2m};
        if ($l2m) {
-               # may redirect $lei->{1} for mbox
-               my $zpipe = $l2m->pre_augment($lei_orig);
-               $io[1] = $lei_orig->{1};
-               pipe(my ($startq, $au_done)) or die "pipe: $!";
-               $done_op->{'.'} = [ \&do_post_augment, $lei_orig,
-                                       $zpipe, $au_done ];
-               local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
-               die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
-               $self->wq_do('query_prepare', \@io, $lei);
-               fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
-               $io[5] = $startq;
+               $done_op->{'.'} = [ \&do_post_augment, $lei, $zpipe, $au_done ];
+               $self->wq_do('query_prepare', \@io, $lei_ipc);
                $io[1] = $zpipe->[1] if $zpipe;
        }
-       start_query($self, \@io, $lei);
+       start_query($self, \@io, $lei_ipc);
        $self->wq_close(1);
        unless ($in_loop) {
-               # for the $lei->atfork_child_wq PIPE handler:
+               # for the $lei_ipc->atfork_child_wq PIPE handler:
                while ($done->{sock}) { $done->event_step }
        }
 }
 
-sub ipc_atfork_prepare {
-       my ($self) = @_;
-       if (exists $self->{remotes}) {
-               require PublicInbox::MboxReader;
-               require IO::Uncompress::Gunzip;
-       }
-       # FDS: (0: done_wr, 1: stdout|mbox, 2: stderr,
-       #       3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
-       $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
-}
-
 sub add_uri {
        my ($self, $uri) = @_;
        if (my $curl = $self->{curl} //= which('curl') // 0) {
+               require PublicInbox::MboxReader;
+               require IO::Uncompress::Gunzip;
                push @{$self->{remotes}}, $uri;
        } else {
                warn "curl missing, ignoring $uri\n";
index ef4885c1968d1e5be80afc9fd2172d8fec404f9a..1842899c6ac16929ae76aaa2d13b43831da6d094 100644 (file)
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 6
+#define SEND_FD_CAPA 10
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
        struct cmsghdr hdr;