our %PATH2CFG; # persistent for socket daemon
our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
+our %LIVE_SOCK; # "GLOB(0x....)" => $lei->{sock}
# TBD: this is a documentation mechanism to show a subcommand
# (may) pass options through to another command:
$dir_idle->force_close if $dir_idle;
%PATH2CFG = ();
$MDIR2CFGPATH = {};
+ %LIVE_SOCK = ();
eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
undef $errors_log;
$quit = \&CORE::exit;
add_maildir_watch($cd, $cfg_f);
}
}
- my $wait = $renames ? $sto->ipc_do('done') : undef;
+ $lei->sto_done_request if $renames;
if ($old) { # cull old non-existent entries
for my $url (keys %$old) {
next if exists $seen{$url};
$lse ? $lse->lms : undef;
}
+sub sto_done_request { # only call this from lei-daemon process (not workers)
+ my ($lei, $sock) = @_;
+ if ($sock //= $lei->{sock}) {
+ $LIVE_SOCK{"$sock"} = $sock;
+ $lei->{sto}->ipc_do('done', "$sock"); # issue, async wait
+ } else { # forcibly wait
+ my $wait = $lei->{sto}->ipc_do('done');
+ }
+}
+
+sub sto_done_complete { # called in lei-daemon when LeiStore->done is complete
+ my ($sock_str) = @_;
+ delete $LIVE_SOCK{$sock_str}; # frees {sock} for waiting lei clients
+}
+
1;
my ($lei, @folders) = @_;
my $lms = $lei->lms or return;
my $sto = $lei->_lei_store or return; # may disappear due to race
- $sto->write_prepare;
+ $sto->write_prepare($lei);
my $err = $lms->arg2folder($lei, \@folders);
$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
return $lei->fail($err->{fail}) if $err->{fail};
$sto->ipc_do('lms_forget_folders', @folders);
- my $wait = $sto->ipc_do('done');
+ $lei->sto_done_request;
}
*_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw;
my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
$op_c->{ops} = $ops; # for PktOp->event_step
+ $self->{lei_sock} = $lei->{sock};
$lei->{ikw} = $self;
}
sub ikw_done_wait {
my ($arg, $pid) = @_;
my ($self, $lei) = @$arg;
- my $wait = $lei->{sto}->ipc_do('done');
$lei->can('wq_done_wait')->($arg, $pid);
}
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $ikw = delete $lei->{ikw} or return $lei->fail;
+ $lei->sto_done_request($ikw->{lei_sock});
$ikw->wq_wait_old(\&ikw_done_wait, $lei);
}
if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
$lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
} elsif ($lei->{sto}) { # lms_clear_src calls only:
- my $wait = $lei->{sto}->ipc_do('done');
+ $lei->sto_done_request;
}
}
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $lne = delete $lei->{lne} or return $lei->fail;
- my $wait = $lei->{sto}->ipc_do('done');
+ $lei->sto_done_request;
$lne->wq_wait_old(\&lne_done_wait, $lei);
}
my ($op_c, $ops) = $lei->workers_start($self, $nproc,
undef, { ipt => $ipt }); # LeiInput subclass
$op_c->{ops} = $ops; # for PktOp->event_step
+ $self->{lei_sock} = $lei->{sock}; # keep client for pmd_done_wait
$lei->{pmd} = $self;
}
my ($self) = @_;
my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
$ipt->{lei} = $self->{lei};
- $ipt->ipc_atfork_child;
+ $ipt->ipc_atfork_child; # calls _lei_atfork_child;
}
sub each_mdir_fn { # maildir_each_file callback
sub pmd_done_wait {
my ($arg, $pid) = @_;
my ($self, $lei) = @$arg;
- my $wait = $lei->{sto}->ipc_do('done');
$lei->can('wq_done_wait')->($arg, $pid);
}
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $pmd = delete $lei->{pmd} or return $lei->fail;
+ $lei->sto_done_request($pmd->{lei_sock});
$pmd->wq_wait_old(\&pmd_done_wait, $lei);
}
}
sub done {
- my ($self) = @_;
+ my ($self, $sock_ref) = @_;
my $err = '';
if (my $im = delete($self->{im})) {
eval { $im->done };
$self->{priv_eidx}->done; # V2Writable::done
xchg_stderr($self);
die $err if $err;
+
+ # notify clients ->done has been issued
+ defined($sock_ref) and
+ $self->{s2d_op_p}->pkt_do('sto_done_complete', $sock_ref);
}
sub ipc_atfork_child {
my $lei = $self->{lei};
$lei->_lei_atfork_child(1) if $lei;
xchg_stderr($self);
- if (my $err = delete($self->{err_pipe})) {
- close $err->[0];
- $self->{-err_wr} = $err->[1];
+ if (my $to_close = delete($self->{to_close})) {
+ close($_) for @$to_close;
}
$self->SUPER::ipc_atfork_child;
}
sub write_prepare {
my ($self, $lei) = @_;
+ $lei // die 'BUG: $lei not passed';
unless ($self->{-ipc_req}) {
- my $d = $lei->store_path;
- $self->ipc_lock_init("$d/ipc.lock");
- substr($d, -length('/lei/store'), 10, '');
+ # s2d => store-to-daemon messages
+ require PublicInbox::PktOp;
+ my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair;
+ my $dir = $lei->store_path;
+ $self->ipc_lock_init("$dir/ipc.lock");
+ substr($dir, -length('/lei/store'), 10, '');
pipe(my ($r, $w)) or die "pipe: $!";
- my $err_pipe = [ $r, $w ];
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
- $self->ipc_worker_spawn("lei/store $d", $lei->oldset,
- { lei => $lei, err_pipe => $err_pipe });
+ $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, {
+ lei => $lei,
+ -err_wr => $w,
+ to_close => [ $r, $s2d_op_c->{sock} ],
+ s2d_op_p => $s2d_op_p,
+ });
require PublicInbox::LeiStoreErr;
- PublicInbox::LeiStoreErr->new($err_pipe->[0], $lei);
+ PublicInbox::LeiStoreErr->new($r, $lei);
+ $s2d_op_c->{ops} = {
+ sto_done_complete => [ $lei->can('sto_done_complete') ]
+ };
}
$lei->{sto} = $self;
}
if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) {
warn "BUG: {sto} missing with --mail-sync";
}
- my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef;
- $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
+ $lei->sto_done_request if $lei->{sto};
+ my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
$lei->{ovv}->ovv_end($lei);
my $start_mua;
if ($l2m) { # close() calls LeiToMail reap_compress
($cmd, @pargs) = split(/ /, $msg);
}
my $op = $self->{ops}->{$cmd //= $msg};
- die "BUG: unknown message: `$cmd'" unless $op;
- my ($sub, @args) = @$op;
- $sub->(@args, @pargs);
+ if ($op) {
+ my ($sub, @args) = @$op;
+ $sub->(@args, @pargs);
+ } elsif ($msg ne '') {
+ die "BUG: unknown message: `$cmd'";
+ }
return $self->close if $msg eq ''; # close on EOF
}
}