This allows client sockets to wait for "done" commits to
lei/store while the daemon reacts asynchronously. The goal
of this change is to keep the script/lei client alive until
lei/store commits changes to the filesystem, but without
blocking the lei-daemon event loop. It depends on Perl
refcounting to close the socket.
This change also highlighted our over-use of "done" requests to
lei/store processes, which is now corrected so we only issue it
on collective socket EOF rather than upon reaping every single
worker.
This also fixes "lei forget-mail-sync" when it is the initial
command.
This took several iterations and much debugging to arrive at the
current implementation:
1. The initial iteration of this change utilized socket passing
from lei-daemon to lei/store, which necessitated switching
from faster pipes to slower Unix sockets.
2. The second iteration switched to registering notification sockets
independently of "done" requests, but that could lead to early
wakeups when "done" was requested by other workers. This
appeared to work most of the time, but suffered races under
high load which were difficult to track down.
Finally, this iteration passes the stringified socket GLOB ref
to lei/store which is echoed back to lei-daemon upon completion
of that particular "done" request.
our %PATH2CFG; # persistent for socket daemon
our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
our %PATH2CFG; # persistent for socket daemon
our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
+our %LIVE_SOCK; # "GLOB(0x....)" => $lei->{sock}
# TBD: this is a documentation mechanism to show a subcommand
# (may) pass options through to another command:
# TBD: this is a documentation mechanism to show a subcommand
# (may) pass options through to another command:
$dir_idle->force_close if $dir_idle;
%PATH2CFG = ();
$MDIR2CFGPATH = {};
$dir_idle->force_close if $dir_idle;
%PATH2CFG = ();
$MDIR2CFGPATH = {};
eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
undef $errors_log;
$quit = \&CORE::exit;
eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
undef $errors_log;
$quit = \&CORE::exit;
add_maildir_watch($cd, $cfg_f);
}
}
add_maildir_watch($cd, $cfg_f);
}
}
- my $wait = $renames ? $sto->ipc_do('done') : undef;
+ $lei->sto_done_request if $renames;
if ($old) { # cull old non-existent entries
for my $url (keys %$old) {
next if exists $seen{$url};
if ($old) { # cull old non-existent entries
for my $url (keys %$old) {
next if exists $seen{$url};
$lse ? $lse->lms : undef;
}
$lse ? $lse->lms : undef;
}
+sub sto_done_request { # only call this from lei-daemon process (not workers)
+ my ($lei, $sock) = @_;
+ if ($sock //= $lei->{sock}) {
+ $LIVE_SOCK{"$sock"} = $sock;
+ $lei->{sto}->ipc_do('done', "$sock"); # issue, async wait
+ } else { # forcibly wait
+ my $wait = $lei->{sto}->ipc_do('done');
+ }
+}
+
+sub sto_done_complete { # called in lei-daemon when LeiStore->done is complete
+ my ($sock_str) = @_;
+ delete $LIVE_SOCK{$sock_str}; # frees {sock} for waiting lei clients
+}
+
my ($lei, @folders) = @_;
my $lms = $lei->lms or return;
my $sto = $lei->_lei_store or return; # may disappear due to race
my ($lei, @folders) = @_;
my $lms = $lei->lms or return;
my $sto = $lei->_lei_store or return; # may disappear due to race
+ $sto->write_prepare($lei);
my $err = $lms->arg2folder($lei, \@folders);
$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
return $lei->fail($err->{fail}) if $err->{fail};
$sto->ipc_do('lms_forget_folders', @folders);
my $err = $lms->arg2folder($lei, \@folders);
$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
return $lei->fail($err->{fail}) if $err->{fail};
$sto->ipc_do('lms_forget_folders', @folders);
- my $wait = $sto->ipc_do('done');
+ $lei->sto_done_request;
}
*_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw;
}
*_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw;
my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
$op_c->{ops} = $ops; # for PktOp->event_step
my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
$op_c->{ops} = $ops; # for PktOp->event_step
+ $self->{lei_sock} = $lei->{sock};
sub ikw_done_wait {
my ($arg, $pid) = @_;
my ($self, $lei) = @$arg;
sub ikw_done_wait {
my ($arg, $pid) = @_;
my ($self, $lei) = @$arg;
- my $wait = $lei->{sto}->ipc_do('done');
$lei->can('wq_done_wait')->($arg, $pid);
}
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $ikw = delete $lei->{ikw} or return $lei->fail;
$lei->can('wq_done_wait')->($arg, $pid);
}
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $ikw = delete $lei->{ikw} or return $lei->fail;
+ $lei->sto_done_request($ikw->{lei_sock});
$ikw->wq_wait_old(\&ikw_done_wait, $lei);
}
$ikw->wq_wait_old(\&ikw_done_wait, $lei);
}
if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
$lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
} elsif ($lei->{sto}) { # lms_clear_src calls only:
if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
$lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
} elsif ($lei->{sto}) { # lms_clear_src calls only:
- my $wait = $lei->{sto}->ipc_do('done');
+ $lei->sto_done_request;
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $lne = delete $lei->{lne} or return $lei->fail;
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $lne = delete $lei->{lne} or return $lei->fail;
- my $wait = $lei->{sto}->ipc_do('done');
+ $lei->sto_done_request;
$lne->wq_wait_old(\&lne_done_wait, $lei);
}
$lne->wq_wait_old(\&lne_done_wait, $lei);
}
my ($op_c, $ops) = $lei->workers_start($self, $nproc,
undef, { ipt => $ipt }); # LeiInput subclass
$op_c->{ops} = $ops; # for PktOp->event_step
my ($op_c, $ops) = $lei->workers_start($self, $nproc,
undef, { ipt => $ipt }); # LeiInput subclass
$op_c->{ops} = $ops; # for PktOp->event_step
+ $self->{lei_sock} = $lei->{sock}; # keep client for pmd_done_wait
my ($self) = @_;
my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
$ipt->{lei} = $self->{lei};
my ($self) = @_;
my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
$ipt->{lei} = $self->{lei};
- $ipt->ipc_atfork_child;
+ $ipt->ipc_atfork_child; # calls _lei_atfork_child;
}
sub each_mdir_fn { # maildir_each_file callback
}
sub each_mdir_fn { # maildir_each_file callback
sub pmd_done_wait {
my ($arg, $pid) = @_;
my ($self, $lei) = @$arg;
sub pmd_done_wait {
my ($arg, $pid) = @_;
my ($self, $lei) = @$arg;
- my $wait = $lei->{sto}->ipc_do('done');
$lei->can('wq_done_wait')->($arg, $pid);
}
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $pmd = delete $lei->{pmd} or return $lei->fail;
$lei->can('wq_done_wait')->($arg, $pid);
}
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
my $pmd = delete $lei->{pmd} or return $lei->fail;
+ $lei->sto_done_request($pmd->{lei_sock});
$pmd->wq_wait_old(\&pmd_done_wait, $lei);
}
$pmd->wq_wait_old(\&pmd_done_wait, $lei);
}
+ my ($self, $sock_ref) = @_;
my $err = '';
if (my $im = delete($self->{im})) {
eval { $im->done };
my $err = '';
if (my $im = delete($self->{im})) {
eval { $im->done };
$self->{priv_eidx}->done; # V2Writable::done
xchg_stderr($self);
die $err if $err;
$self->{priv_eidx}->done; # V2Writable::done
xchg_stderr($self);
die $err if $err;
+
+ # notify clients ->done has been issued
+ defined($sock_ref) and
+ $self->{s2d_op_p}->pkt_do('sto_done_complete', $sock_ref);
my $lei = $self->{lei};
$lei->_lei_atfork_child(1) if $lei;
xchg_stderr($self);
my $lei = $self->{lei};
$lei->_lei_atfork_child(1) if $lei;
xchg_stderr($self);
- if (my $err = delete($self->{err_pipe})) {
- close $err->[0];
- $self->{-err_wr} = $err->[1];
+ if (my $to_close = delete($self->{to_close})) {
+ close($_) for @$to_close;
}
$self->SUPER::ipc_atfork_child;
}
sub write_prepare {
my ($self, $lei) = @_;
}
$self->SUPER::ipc_atfork_child;
}
sub write_prepare {
my ($self, $lei) = @_;
+ $lei // die 'BUG: $lei not passed';
unless ($self->{-ipc_req}) {
unless ($self->{-ipc_req}) {
- my $d = $lei->store_path;
- $self->ipc_lock_init("$d/ipc.lock");
- substr($d, -length('/lei/store'), 10, '');
+ # s2d => store-to-daemon messages
+ require PublicInbox::PktOp;
+ my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair;
+ my $dir = $lei->store_path;
+ $self->ipc_lock_init("$dir/ipc.lock");
+ substr($dir, -length('/lei/store'), 10, '');
pipe(my ($r, $w)) or die "pipe: $!";
pipe(my ($r, $w)) or die "pipe: $!";
- my $err_pipe = [ $r, $w ];
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
- $self->ipc_worker_spawn("lei/store $d", $lei->oldset,
- { lei => $lei, err_pipe => $err_pipe });
+ $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, {
+ lei => $lei,
+ -err_wr => $w,
+ to_close => [ $r, $s2d_op_c->{sock} ],
+ s2d_op_p => $s2d_op_p,
+ });
require PublicInbox::LeiStoreErr;
require PublicInbox::LeiStoreErr;
- PublicInbox::LeiStoreErr->new($err_pipe->[0], $lei);
+ PublicInbox::LeiStoreErr->new($r, $lei);
+ $s2d_op_c->{ops} = {
+ sto_done_complete => [ $lei->can('sto_done_complete') ]
+ };
if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) {
warn "BUG: {sto} missing with --mail-sync";
}
if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) {
warn "BUG: {sto} missing with --mail-sync";
}
- my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef;
- $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
+ $lei->sto_done_request if $lei->{sto};
+ my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
$lei->{ovv}->ovv_end($lei);
my $start_mua;
if ($l2m) { # close() calls LeiToMail reap_compress
$lei->{ovv}->ovv_end($lei);
my $start_mua;
if ($l2m) { # close() calls LeiToMail reap_compress
($cmd, @pargs) = split(/ /, $msg);
}
my $op = $self->{ops}->{$cmd //= $msg};
($cmd, @pargs) = split(/ /, $msg);
}
my $op = $self->{ops}->{$cmd //= $msg};
- die "BUG: unknown message: `$cmd'" unless $op;
- my ($sub, @args) = @$op;
- $sub->(@args, @pargs);
+ if ($op) {
+ my ($sub, @args) = @$op;
+ $sub->(@args, @pargs);
+ } elsif ($msg ne '') {
+ die "BUG: unknown message: `$cmd'";
+ }
return $self->close if $msg eq ''; # close on EOF
}
}
return $self->close if $msg eq ''; # close on EOF
}
}