Keeping track of non-standard FDs gets tricky, so make it easier
by relying on st_dev/st_ino mapping in the transmitted objects.
We'll keep using numbers for the standard FDs since we need to
be able to easily redirect them in the producer (main daemon)
process for (gzip|bzip2|xz) if writing to a compressed mbox.
+sub io_restore ($$) {
+ my ($dst, $src) = @_;
+ for my $i (0..2) { # standard FDs
+ my $io = delete $src->{$i} or next;
+ $dst->{$i} = $io;
+ }
+ for my $i (3..9) { # named (non-standard) FDs
+ my $io = $src->{$i} or next;
+ my @st = stat($io) or die "stat $src.$i ($io): $!";
+ my $f = delete $dst->{"dev=$st[0],ino=$st[1]"} // next;
+ $dst->{$f} = $io;
+ delete $src->{$i};
+ }
+}
+
# usage: my %sig = $lei->atfork_child_wq($wq);
# local @SIG{keys %sig} = values %sig;
sub atfork_child_wq {
my ($self, $wq) = @_;
# usage: my %sig = $lei->atfork_child_wq($wq);
# local @SIG{keys %sig} = values %sig;
sub atfork_child_wq {
my ($self, $wq) = @_;
- my ($sock, $l2m_wq_s1);
- (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
- $self->{sock} = $sock if -S $sock;
- $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1;
+ io_restore($self, $wq);
+ io_restore($self->{l2m}, $wq);
%PATH2CFG = ();
undef $errors_log;
$quit = \&CORE::exit;
%PATH2CFG = ();
undef $errors_log;
$quit = \&CORE::exit;
close(delete $self->{$i});
}
# trigger the LeiXSearch $done OpPipe:
close(delete $self->{$i});
}
# trigger the LeiXSearch $done OpPipe:
- syswrite($self->{0}, '!') if $self->{0} && -p $self->{0};
+ syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
$SIG{PIPE} = 'DEFAULT';
die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
});
}
$SIG{PIPE} = 'DEFAULT';
die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
});
}
+sub io_extract ($;@) {
+ my ($obj, @fields) = @_;
+ my @io;
+ for my $f (@fields) {
+ my $io = delete $obj->{$f} or next;
+ my @st = stat($io) or die "W: stat $obj.$f ($io): $!";
+ $obj->{"dev=$st[0],ino=$st[1]"} = $f;
+ push @io, $io;
+ }
+ @io
+}
+
# usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
sub atfork_parent_wq {
my ($self, $wq) = @_;
my $env = delete $self->{env}; # env is inherited at fork
# usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
sub atfork_parent_wq {
my ($self, $wq) = @_;
my $env = delete $self->{env}; # env is inherited at fork
- my $ret = bless { %$self }, ref($self);
- if (my $dedupe = delete $ret->{dedupe}) {
- $ret->{dedupe} = $wq->deep_clone($dedupe);
+ my $lei = bless { %$self }, ref($self);
+ if (my $dedupe = delete $lei->{dedupe}) {
+ $lei->{dedupe} = $wq->deep_clone($dedupe);
- delete @$ret{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
- my @io = delete @$ret{0..2};
- $io[3] = delete($ret->{sock}) // $io[2];
- my $l2m = $ret->{l2m};
+ delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
+ my @io = (delete(@$lei{qw(0 1 2)}),
+ io_extract($lei, qw(sock op_pipe startq)));
+ my $l2m = $lei->{l2m};
if ($l2m && $l2m != $wq) { # $wq == lxs
if ($l2m && $l2m != $wq) { # $wq == lxs
- $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
+ if (my $wq_s1 = $l2m->{-wq_s1}) {
+ push @io, io_extract($l2m, '-wq_s1');
+ $l2m->{-wq_s1} = $wq_s1;
+ }
};
} elsif ($l2m && $l2m->{-wq_s1}) {
my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
};
} elsif ($l2m && $l2m->{-wq_s1}) {
my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
- # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout,
- # $io[4] becomes a notification pipe that triggers EOF
+ # $io[-1] becomes a notification pipe that triggers EOF
# in this wq worker when all outstanding ->write_mail
# calls are complete
# in this wq worker when all outstanding ->write_mail
# calls are complete
- die "BUG: \$io[4] $io[4] unexpected" if $io[4];
- pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!";
- fcntl($io[4], 1031, 4096) if $^O eq 'linux';
+ pipe($l2m->{each_smsg_done}, $io[$#io + 1]) or die "pipe: $!";
+ fcntl($io[-1], 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
+ $lei_ipc->{each_smsg_not_done} = $#io;
my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
$self->{git} = $git;
my $git_dir = $git->{git_dir};
my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
$self->{git} = $git;
my $git_dir = $git->{git_dir};
sub write_mail { # via ->wq_do
my ($self, $git_dir, $smsg, $lei) = @_;
sub write_mail { # via ->wq_do
my ($self, $git_dir, $smsg, $lei) = @_;
- my $not_done = delete $self->{4}; # write end of {each_smsg_done}
+ my $not_done = delete $self->{$lei->{each_smsg_not_done}};
my $wcb = $self->{wcb} //= do { # first message
my %sig = $lei->atfork_child_wq($self);
@SIG{keys %sig} = values %sig; # not local
my $wcb = $self->{wcb} //= do { # first message
my %sig = $lei->atfork_child_wq($self);
@SIG{keys %sig} = values %sig; # not local
$git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
}
$git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
}
-sub ipc_atfork_prepare {
- my ($self) = @_;
- # FDs: (done_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
- $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
-}
-
# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
# ordering is unstable at worker exit and may cause segfaults
sub reap_gits {
# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
# ordering is unstable at worker exit and may cause segfaults
sub reap_gits {
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
local $0 = "$0 query_thread_mset";
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
local $0 = "$0 query_thread_mset";
- my $startq = delete $self->{5};
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
+ my $startq = delete $lei->{startq};
my ($srch, $over) = ($ibxish->search, $ibxish->over);
unless ($srch && $over) {
my ($srch, $over) = ($ibxish->search, $ibxish->over);
unless ($srch && $over) {
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei) = @_;
local $0 = "$0 query_mset";
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei) = @_;
local $0 = "$0 query_mset";
- my $startq = delete $self->{5};
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
+ my $startq = delete $lei->{startq};
my $mo = { %{$lei->{mset_opt}} };
my $mset;
for my $loc (locals($self)) {
my $mo = { %{$lei->{mset_opt}} };
my $mset;
for my $loc (locals($self)) {
$smsg->parse_references($eml, mids($eml));
$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
delete @$smsg{qw(From Subject -ds -ts)};
$smsg->parse_references($eml, mids($eml));
$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
delete @$smsg{qw(From Subject -ds -ts)};
- if (my $startq = delete($self->{5})) { wait_startq($startq) }
+ if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
$each_smsg->($smsg, undef, $eml);
}
$each_smsg->($smsg, undef, $eml);
}
my ($self, $lei) = @_;
local $0 = "$0 query_prepare";
my %sig = $lei->atfork_child_wq($self);
my ($self, $lei) = @_;
local $0 = "$0 query_prepare";
my %sig = $lei->atfork_child_wq($self);
- -p $lei->{0} or die "BUG: \$done pipe expected";
+ -p $lei->{op_pipe} or die "BUG: \$done pipe expected";
local @SIG{keys %sig} = values %sig;
local @SIG{keys %sig} = values %sig;
+ delete $lei->{l2m}->{-wq_s1};
eval { $lei->{l2m}->do_augment($lei) };
$lei->fail($@) if $@;
eval { $lei->{l2m}->do_augment($lei) };
$lei->fail($@) if $@;
- syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
+ syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
}
sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
}
sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
- my ($self, $lei_orig) = @_;
- my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
- $io[0] = undef;
- pipe(my $done, $io[0]) or die "pipe $!";
- $lei_orig->{1}->autoflush(1);
+ my ($self, $lei) = @_;
+ $lei->{1}->autoflush(1);
+ my ($au_done, $zpipe);
+ my $l2m = $lei->{l2m};
+ if ($l2m) {
+ pipe($lei->{startq}, $au_done) or die "pipe: $!";
+ # 1031: F_SETPIPE_SZ
+ fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+ $zpipe = $l2m->pre_augment($lei);
+ }
+ pipe(my $done, $lei->{op_pipe}) or die "pipe $!";
+ my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
+ delete($lei->{op_pipe});
- $lei_orig->event_step_init; # wait for shutdowns
+ $lei->event_step_init; # wait for shutdowns
- '' => [ \&query_done, $lei_orig ],
- '!' => [ \&sigpipe_handler, $lei_orig ]
+ '' => [ \&query_done, $lei ],
+ '!' => [ \&sigpipe_handler, $lei ]
- my $in_loop = exists $lei_orig->{sock};
+ my $in_loop = exists $lei->{sock};
$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
- # may redirect $lei->{1} for mbox
- my $zpipe = $l2m->pre_augment($lei_orig);
- $io[1] = $lei_orig->{1};
- pipe(my ($startq, $au_done)) or die "pipe: $!";
- $done_op->{'.'} = [ \&do_post_augment, $lei_orig,
- $zpipe, $au_done ];
- local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
- die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
- $self->wq_do('query_prepare', \@io, $lei);
- fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
- $io[5] = $startq;
+ $done_op->{'.'} = [ \&do_post_augment, $lei, $zpipe, $au_done ];
+ $self->wq_do('query_prepare', \@io, $lei_ipc);
$io[1] = $zpipe->[1] if $zpipe;
}
$io[1] = $zpipe->[1] if $zpipe;
}
- start_query($self, \@io, $lei);
+ start_query($self, \@io, $lei_ipc);
$self->wq_close(1);
unless ($in_loop) {
$self->wq_close(1);
unless ($in_loop) {
- # for the $lei->atfork_child_wq PIPE handler:
+ # for the $lei_ipc->atfork_child_wq PIPE handler:
while ($done->{sock}) { $done->event_step }
}
}
while ($done->{sock}) { $done->event_step }
}
}
-sub ipc_atfork_prepare {
- my ($self) = @_;
- if (exists $self->{remotes}) {
- require PublicInbox::MboxReader;
- require IO::Uncompress::Gunzip;
- }
- # FDS: (0: done_wr, 1: stdout|mbox, 2: stderr,
- # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
- $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
-}
-
sub add_uri {
my ($self, $uri) = @_;
if (my $curl = $self->{curl} //= which('curl') // 0) {
sub add_uri {
my ($self, $uri) = @_;
if (my $curl = $self->{curl} //= which('curl') // 0) {
+ require PublicInbox::MboxReader;
+ require IO::Uncompress::Gunzip;
push @{$self->{remotes}}, $uri;
} else {
warn "curl missing, ignoring $uri\n";
push @{$self->{remotes}}, $uri;
} else {
warn "curl missing, ignoring $uri\n";
#include <sys/socket.h>
#if defined(CMSG_SPACE) && defined(CMSG_LEN)
#include <sys/socket.h>
#if defined(CMSG_SPACE) && defined(CMSG_LEN)
#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
union my_cmsg {
struct cmsghdr hdr;
#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
union my_cmsg {
struct cmsghdr hdr;