ipc_worker_stop($self);
}
+# Sereal doesn't have dclone
+sub deep_clone { thaw(freeze($_[-1])) }
+
1;
if (my $sock = $self->{sock}) {
push @$tcafc, @$self{qw(0 1 2)}, $sock;
}
+ for my $f (qw(lxs l2m)) {
+ my $ipc = $self->{$f} or next;
+ push @$tcafc, grep { defined }
+ @$ipc{qw(-wq_s1 -wq_s2 -ipc_req -ipc_res)};
+ }
}
# usage: my %sig = $lei->atfork_child_wq($wq);
# local @SIG{keys %sig} = values %sig;
sub atfork_child_wq {
my ($self, $wq) = @_;
- @$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
+ my ($sock, $l2m_wq_s1);
+ (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
+ $self->{sock} = $sock if -S $sock;
+ $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1;
%PATH2CFG = ();
$quit = \&CORE::exit;
@TO_CLOSE_ATFORK_CHILD = ();
# usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
sub atfork_parent_wq {
my ($self, $wq) = @_;
- if ($wq->wq_workers) {
- my $env = delete $self->{env}; # env is inherited at fork
- my $ret = bless { %$self }, ref($self);
- $self->{env} = $env;
- delete @$ret{qw(-lei_store cfg pgr)};
- ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
- } else {
- ($self, @$self{0..2}, $self->{sock} // ());
+ my $env = delete $self->{env}; # env is inherited at fork
+ my $ret = bless { %$self }, ref($self);
+ if (my $dedupe = delete $ret->{dedupe}) {
+ $ret->{dedupe} = $wq->deep_clone($dedupe);
+ }
+ $self->{env} = $env;
+ delete @$ret{qw(-lei_store cfg pgr lxs)}; # keep l2m
+ my @io = delete @$ret{0..2};
+ $io[3] = delete($ret->{sock}) // *STDERR{GLOB};
+ my $l2m = $ret->{l2m};
+ if ($l2m && $l2m != $wq) {
+ $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
+ if (my @pids = $l2m->wq_close) {
+ $wq->{l2m_pids} = \@pids;
+ }
}
+ ($ret, @io);
}
sub _help ($;$) {
@cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd;
push @cmd, $mfolder unless defined($replaced);
$sock //= $self->{sock};
- if ($sock) { # lei(1) client process runs it
+ if ($PublicInbox::DS::in_loop) { # lei(1) client process runs it
send($sock, exec_buf(\@cmd, {}), MSG_EOR);
} else { # oneshot
$self->{"mua.pid.$self.$$"} = spawn(\@cmd);
sub ovv_atexit_child {
my ($self, $lei) = @_;
+ if (my $l2m = delete $lei->{l2m}) {
+ # gracefully stop lei2mail processes after all
+ # ->write_mail work is complete
+ delete $l2m->{-wq_s1};
+ if (my $rd = delete $l2m->{each_smsg_done}) {
+ read($rd, my $buf, 1); # wait for EOF
+ }
+ }
+ # order matters, git->{-tmp}->DESTROY must not fire until
+ # {each_smsg_done} hits EOF above
if (my $git = delete $self->{git}) {
$git->async_wait_all;
}
sub ovv_each_smsg_cb { # runs in wq worker usually
my ($self, $lei, $ibxish) = @_;
- $lei->{ovv_buf} = \(my $buf = '');
- delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
my $json;
$lei->{1}->autoflush(1);
if (my $pkg = $self->{json}) {
$json->utf8->canonical;
$json->ascii(1) if $lei->{opt}->{ascii};
}
- if (my $l2m = $lei->{l2m}) {
+ my $l2m = $lei->{l2m};
+ if ($l2m && $l2m->{-wq_s1}) {
+ my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
+ # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout,
+ # $io[4] becomes a notification pipe that triggers EOF
+ # in this wq worker when all outstanding ->write_mail
+ # calls are complete
+ die "BUG: \$io[4] $io[4] unexpected" if $io[4];
+ pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!";
+ fcntl($io[4], 1031, 4096) if $^O eq 'linux';
+ delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
+ my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
+ $self->{git} = $git;
+ my $git_dir = $git->{git_dir};
+ sub {
+ my ($smsg, $mitem) = @_;
+ my $kw = []; # TODO get from mitem
+ $l2m->wq_do('write_mail', \@io, $git_dir,
+ $smsg->{blob}, $lei_ipc, $kw)
+ }
+ } elsif ($l2m) {
my $wcb = $l2m->write_cb($lei);
my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
$self->{git} = $git; # for ovv_atexit_child
};
} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+ $lei->{ovv_buf} = \(my $buf = '');
sub { # DIY prettiness :P
my ($smsg, $mitem) = @_;
$smsg = _unbless_smsg($smsg, $mitem);
}
} elsif ($json) {
my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+ $lei->{ovv_buf} = \(my $buf = '');
sub {
my ($smsg, $mitem) = @_;
delete @$smsg{qw(tid num)};
$j = 1 if !$opt->{thread};
$j++ if $opt->{'local'}; # for sto->search below
$self->atfork_prepare_wq($lxs);
- $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
- // $lxs->wq_workers($j);
+ $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset);
+ $self->{lxs} = $lxs;
- # no forking workers after this
my $ovv = PublicInbox::LeiOverview->new($self) or return;
+ if (my $l2m = $self->{l2m}) {
+ $j = 4 if $j <= 4; # TODO configurable
+ $self->atfork_prepare_wq($l2m);
+ $l2m->wq_workers_start('lei2mail', $j, $self->oldset);
+ }
+
+ # no forking workers after this
my $sto = $self->_lei_store(1);
unshift(@srcs, $sto->search) if $opt->{'local'};
my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
package PublicInbox::LeiToMail;
use strict;
use v5.10.1;
+use parent qw(PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::Lock;
use PublicInbox::ProcessPipe;
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
use Errno qw(EEXIST ESPIPE ENOENT);
+use PublicInbox::Git;
my %kw2char = ( # Maildir characters
draft => 'D',
$_[0]->{base_type} =~ /\A(?:maildir|mh|imap|jmap)\z/ ? 1 : 0;
}
+sub write_mail { # via ->wq_do
+ my ($self, $git_dir, $oid, $lei, $kw) = @_;
+ my $wcb = $self->{wcb} //= do { # first message
+ my %sig = $lei->atfork_child_wq($self);
+ @SIG{keys %sig} = values %sig; # not local
+ $lei->{dedupe}->prepare_dedupe;
+ $self->write_cb($lei);
+ };
+ my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
+ $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+}
+
+sub ipc_atfork_prepare {
+ my ($self) = @_;
+ # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]);
+ $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ for my $pid_git (grep(/\A$$\0/, keys %$self)) {
+ $self->{$pid_git}->async_wait_all;
+ }
+}
+
1;
@{$ctx->{xids}} = ();
}
} while (_mset_more($mset, $mo));
+ undef $each_smsg; # drops @io for l2m->{each_smsg_done}
$lei->{ovv}->ovv_atexit_child($lei);
}
$each_smsg->($smsg, $it);
}
} while (_mset_more($mset, $mo));
+ undef $each_smsg; # drops @io for l2m->{each_smsg_done}
$lei->{ovv}->ovv_atexit_child($lei);
}
}
sub query_done { # EOF callback
- my ($lei) = @_;
- $lei->{ovv}->ovv_end($lei);
- if (my $l2m = $lei->{l2m}) {
- $lei->start_mua unless $l2m->lock_free;
+ my ($self, $lei) = @_;
+ my $l2m = delete $lei->{l2m};
+ if (my $pids = delete $self->{l2m_pids}) {
+ my $ipc_worker_reap = $self->can('ipc_worker_reap');
+ dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids;
}
+ $lei->{ovv}->ovv_end($lei);
+ $lei->start_mua if $l2m && !$l2m->lock_free;
$lei->dclose;
}
}
my $remotes = $self->{remotes} // [];
if ($lei->{opt}->{thread}) {
- $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
for my $ibxish (@$srcs) {
$self->wq_do('query_thread_mset', $io, $lei, $ibxish);
}
} else {
- $lei->{-parallel} = scalar(@$remotes);
$self->wq_do('query_mset', $io, $lei, $srcs);
}
# TODO
$io[0] = undef;
pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
- $lei_orig->{lxs} = $self;
$lei_orig->event_step_init; # wait for shutdowns
- my $op_map = { '' => [ \&query_done, $lei_orig ] };
+ my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
my $in_loop = exists $lei_orig->{sock};
my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
- if (my $l2m = $lei->{l2m}) {
+ my $l2m = $lei->{l2m};
+ if ($l2m) {
$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
$io[1] = $lei_orig->{1};
$op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
$op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
$opp->event_step;
my $ipc_worker_reap = $self->can('ipc_worker_reap');
+ if (my $l2m_pids = delete $self->{l2m_pids}) {
+ dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
+ }
dwaitpid($_, $ipc_worker_reap, $self) for @pids;
}
}
sub ipc_atfork_prepare {
my ($self) = @_;
- $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+ # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}
#include <sys/socket.h>
#if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 4
+#define SEND_FD_CAPA 5
#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
union my_cmsg {
struct cmsghdr hdr;