We don't need the result of query_prepare (for augmenting or
mass unlinking) until we're ready to deduplicate and write
results to the filesystem. This ought to let us hide some of
the cost of Xapian searches on multi-device/core systems for
extremely expensive searches.
my ($sock, $l2m_wq_s1);
(@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
$self->{sock} = $sock if -S $sock;
my ($sock, $l2m_wq_s1);
(@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
$self->{sock} = $sock if -S $sock;
- $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1;
+ $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1;
%PATH2CFG = ();
$quit = \&CORE::exit;
@TO_CLOSE_ATFORK_CHILD = ();
%PATH2CFG = ();
$quit = \&CORE::exit;
@TO_CLOSE_ATFORK_CHILD = ();
sub write_mail { # via ->wq_do
my ($self, $git_dir, $oid, $lei, $kw) = @_;
sub write_mail { # via ->wq_do
my ($self, $git_dir, $oid, $lei, $kw) = @_;
+ my $not_done = delete $self->{4}; # write end of {each_smsg_done}
my $wcb = $self->{wcb} //= do { # first message
my %sig = $lei->atfork_child_wq($self);
@SIG{keys %sig} = values %sig; # not local
my $wcb = $self->{wcb} //= do { # first message
my %sig = $lei->atfork_child_wq($self);
@SIG{keys %sig} = values %sig; # not local
$self->write_cb($lei);
};
my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
$self->write_cb($lei);
};
my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
- $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+ $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw, $not_done ]);
}
sub ipc_atfork_prepare {
}
sub ipc_atfork_prepare {
$size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
}
$size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
}
+# $startq will EOF when query_prepare is done augmenting and allow
+# query_mset and query_thread_mset to proceed.
+sub wait_startq ($) {
+ my ($startq) = @_;
+ $_[0] = undef;
+ read($startq, my $query_prepare_done, 1);
+}
+
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
+ my $startq = delete $self->{5};
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
+ wait_startq($startq) if $startq;
next if $dedupe->is_smsg_dup($smsg);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
next if $dedupe->is_smsg_dup($smsg);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei, $srcs) = @_;
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei, $srcs) = @_;
+ my $startq = delete $self->{5};
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
my $mo = { %{$lei->{mset_opt}} };
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
my $mo = { %{$lei->{mset_opt}} };
$mset = $self->mset($mo->{qstr}, $mo);
for my $it ($mset->items) {
my $smsg = smsg_for($self, $it) or next;
$mset = $self->mset($mo->{qstr}, $mo);
for my $it ($mset->items) {
my $smsg = smsg_for($self, $it) or next;
+ wait_startq($startq) if $startq;
next if $dedupe->is_smsg_dup($smsg);
$each_smsg->($smsg, $it);
}
next if $dedupe->is_smsg_dup($smsg);
$each_smsg->($smsg, $it);
}
-sub query_prepare { # wq_do
+sub query_prepare { # for wq_do,
my ($self, $lei) = @_;
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
my ($self, $lei) = @_;
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
- if (my $l2m = $lei->{l2m}) {
- eval { $l2m->do_augment($lei) };
- return $lei->fail($@) if $@;
- }
- # trigger PublicInbox::OpPipe->event_step
- my $qry_status_wr = $lei->{0} or
- return $lei->fail('BUG: qry_status_wr missing');
- $qry_status_wr->autoflush(1);
- print $qry_status_wr '.' or # this should never fail...
- return $lei->fail("BUG? print qry_status_wr: $!");
+ eval { $lei->{l2m}->do_augment($lei) };
+ $lei->fail($@) if $@;
}
sub do_query {
my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
$io[0] = undef;
}
sub do_query {
my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
$io[0] = undef;
- pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+ pipe(my $done, $io[0]) or die "pipe $!";
$lei_orig->event_step_init; # wait for shutdowns
$lei_orig->event_step_init; # wait for shutdowns
- my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
+ my $done_op = { '' => [ \&query_done, $self, $lei_orig ] };
my $in_loop = exists $lei_orig->{sock};
my $in_loop = exists $lei_orig->{sock};
- my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+ $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
my $l2m = $lei->{l2m};
if ($l2m) {
$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
$io[1] = $lei_orig->{1};
my $l2m = $lei->{l2m};
if ($l2m) {
$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
$io[1] = $lei_orig->{1};
- $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
- $self->wq_do('query_prepare', \@io, $lei);
- $opp->event_step if !$in_loop;
- } else {
- start_query($self, \@io, $lei, $srcs);
+ my @l2m_io = (undef, @io[1..$#io]);
+ pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
+ $self->wq_do('query_prepare', \@l2m_io, $lei);
+ $io[4] //= *STDERR{GLOB};
+ die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
+ fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
+ $io[5] = $startq;
+ start_query($self, \@io, $lei, $srcs);
unless ($in_loop) {
my @pids = $self->wq_close;
# for the $lei->atfork_child_wq PIPE handler:
unless ($in_loop) {
my @pids = $self->wq_close;
# for the $lei->atfork_child_wq PIPE handler:
- $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
- $opp->event_step;
+ $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+ $done->event_step;
my $ipc_worker_reap = $self->can('ipc_worker_reap');
if (my $l2m_pids = delete $self->{l2m_pids}) {
dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
my $ipc_worker_reap = $self->can('ipc_worker_reap');
if (my $l2m_pids = delete $self->{l2m_pids}) {
dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
sub ipc_atfork_prepare {
my ($self) = @_;
sub ipc_atfork_prepare {
my ($self) = @_;
- # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
- $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
+ # (0: qry_status_wr, 1: stdout|mbox, 2: stderr,
+ # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]);
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}
#include <sys/socket.h>
#if defined(CMSG_SPACE) && defined(CMSG_LEN)
#include <sys/socket.h>
#if defined(CMSG_SPACE) && defined(CMSG_LEN)
#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
union my_cmsg {
struct cmsghdr hdr;
#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
union my_cmsg {
struct cmsghdr hdr;