]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: non-blocking lei/store->done in lei-daemon
authorEric Wong <e@80x24.org>
Tue, 24 Aug 2021 13:06:39 +0000 (13:06 +0000)
committerEric Wong <e@80x24.org>
Tue, 24 Aug 2021 20:11:39 +0000 (20:11 +0000)
This allows client sockets to wait for "done" commits to
lei/store while the daemon reacts asynchronously.  The goal
of this change is to keep the script/lei client alive until
lei/store commits changes to the filesystem, but without
blocking the lei-daemon event loop.  It depends on Perl
refcounting to close the socket.

This change also highlighted our over-use of "done" requests to
lei/store processes, which is now corrected so we only issue it
on collective socket EOF rather than upon reaping every single
worker.

This also fixes "lei forget-mail-sync" when it is the initial
command.

This took several iterations and much debugging to arrive at the
current implementation:

1. The initial iteration of this change utilized socket passing
   from lei-daemon to lei/store, which necessitated switching
   from faster pipes to slower Unix sockets.

2. The second iteration switched to registering notification sockets
   independently of "done" requests, but that could lead to early
   wakeups when "done" was requested by other workers.  This
   appeared to work most of the time, but suffered races under
   high load which were difficult to track down.

Finally, this iteration passes the stringified socket GLOB ref
to lei/store which is echoed back to lei-daemon upon completion
of that particular "done" request.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiForgetMailSync.pm
lib/PublicInbox/LeiImportKw.pm
lib/PublicInbox/LeiNoteEvent.pm
lib/PublicInbox/LeiPmdir.pm
lib/PublicInbox/LeiStore.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/PktOp.pm

index ea3ec0fec36ef7963b8aad218252d004b352bf9b..5694e92c92001d755cc3062b7d446edce067cf37 100644 (file)
@@ -37,6 +37,7 @@ $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
 our %PATH2CFG; # persistent for socket daemon
 our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
+our %LIVE_SOCK; # "GLOB(0x....)" => $lei->{sock}
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
@@ -565,6 +566,7 @@ sub _lei_atfork_child {
        $dir_idle->force_close if $dir_idle;
        %PATH2CFG = ();
        $MDIR2CFGPATH = {};
+       %LIVE_SOCK = ();
        eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
        undef $errors_log;
        $quit = \&CORE::exit;
@@ -1429,7 +1431,7 @@ sub refresh_watches {
                        add_maildir_watch($cd, $cfg_f);
                }
        }
-       my $wait = $renames ? $sto->ipc_do('done') : undef;
+       $lei->sto_done_request if $renames;
        if ($old) { # cull old non-existent entries
                for my $url (keys %$old) {
                        next if exists $seen{$url};
@@ -1463,4 +1465,19 @@ sub lms { # read-only LeiMailSync
        $lse ? $lse->lms : undef;
 }
 
+sub sto_done_request { # only call this from lei-daemon process (not workers)
+       my ($lei, $sock) = @_;
+       if ($sock //= $lei->{sock}) {
+               $LIVE_SOCK{"$sock"} = $sock;
+               $lei->{sto}->ipc_do('done', "$sock"); # issue, async wait
+       } else { # forcibly wait
+               my $wait = $lei->{sto}->ipc_do('done');
+       }
+}
+
+sub sto_done_complete { # called in lei-daemon when LeiStore->done is complete
+       my ($sock_str) = @_;
+       delete $LIVE_SOCK{$sock_str}; # frees {sock} for waiting lei clients
+}
+
 1;
index 940ca1b692c10d47faf7849d275ed1ac7c5c41e0..2b4e58a9b39a7c3ed69e53341920489d81d8f44c 100644 (file)
@@ -16,12 +16,12 @@ sub lei_forget_mail_sync {
        my ($lei, @folders) = @_;
        my $lms = $lei->lms or return;
        my $sto = $lei->_lei_store or return; # may disappear due to race
-       $sto->write_prepare;
+       $sto->write_prepare($lei);
        my $err = $lms->arg2folder($lei, \@folders);
        $lei->qerr(@{$err->{qerr}}) if $err->{qerr};
        return $lei->fail($err->{fail}) if $err->{fail};
        $sto->ipc_do('lms_forget_folders', @folders);
-       my $wait = $sto->ipc_do('done');
+       $lei->sto_done_request;
 }
 
 *_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw;
index 2878cbdf30f7751d0262a217f4f5a7fe6f796712..402125cf5a43a4b1d1cb0cc0988367313a310907 100644 (file)
@@ -13,6 +13,7 @@ sub new {
        my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
        my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
        $op_c->{ops} = $ops; # for PktOp->event_step
+       $self->{lei_sock} = $lei->{sock};
        $lei->{ikw} = $self;
 }
 
@@ -42,13 +43,13 @@ sub ck_update_kw { # via wq_io_do
 sub ikw_done_wait {
        my ($arg, $pid) = @_;
        my ($self, $lei) = @$arg;
-       my $wait = $lei->{sto}->ipc_do('done');
        $lei->can('wq_done_wait')->($arg, $pid);
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
        my ($lei) = @_;
        my $ikw = delete $lei->{ikw} or return $lei->fail;
+       $lei->sto_done_request($ikw->{lei_sock});
        $ikw->wq_wait_old(\&ikw_done_wait, $lei);
 }
 
index 1cd1529640f8c80b82b6b09bef4914397192df71..6a40ba3967b816d6310905d751520ded7915e180 100644 (file)
@@ -15,7 +15,7 @@ sub flush_lei ($) {
        if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
                $lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
        } elsif ($lei->{sto}) { # lms_clear_src calls only:
-               my $wait = $lei->{sto}->ipc_do('done');
+               $lei->sto_done_request;
        }
 }
 
@@ -117,7 +117,7 @@ sub lne_done_wait {
 sub _lei_wq_eof { # EOF callback for main lei daemon
        my ($lei) = @_;
        my $lne = delete $lei->{lne} or return $lei->fail;
-       my $wait = $lei->{sto}->ipc_do('done');
+       $lei->sto_done_request;
        $lne->wq_wait_old(\&lne_done_wait, $lei);
 }
 
index 760f276cffe41bf45566de6df3f377bcfd8332d9..59cf886e1526854db0755429b59ac2dba445e8aa 100644 (file)
@@ -25,6 +25,7 @@ sub new {
        my ($op_c, $ops) = $lei->workers_start($self, $nproc,
                undef, { ipt => $ipt }); # LeiInput subclass
        $op_c->{ops} = $ops; # for PktOp->event_step
+       $self->{lei_sock} = $lei->{sock}; # keep client for pmd_done_wait
        $lei->{pmd} = $self;
 }
 
@@ -32,7 +33,7 @@ sub ipc_atfork_child {
        my ($self) = @_;
        my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
        $ipt->{lei} = $self->{lei};
-       $ipt->ipc_atfork_child;
+       $ipt->ipc_atfork_child; # calls _lei_atfork_child;
 }
 
 sub each_mdir_fn { # maildir_each_file callback
@@ -48,13 +49,13 @@ sub mdir_iter { # via wq_io_do
 sub pmd_done_wait {
        my ($arg, $pid) = @_;
        my ($self, $lei) = @$arg;
-       my $wait = $lei->{sto}->ipc_do('done');
        $lei->can('wq_done_wait')->($arg, $pid);
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
        my ($lei) = @_;
        my $pmd = delete $lei->{pmd} or return $lei->fail;
+       $lei->sto_done_request($pmd->{lei_sock});
        $pmd->wq_wait_old(\&pmd_done_wait, $lei);
 }
 
index bbd853e5a2a40a4523fa13068cb4a4652d3679e1..28e36e89c7ca04c75da16e68ef34080bb3c8780e 100644 (file)
@@ -471,7 +471,7 @@ sub xchg_stderr {
 }
 
 sub done {
-       my ($self) = @_;
+       my ($self, $sock_ref) = @_;
        my $err = '';
        if (my $im = delete($self->{im})) {
                eval { $im->done };
@@ -486,6 +486,10 @@ sub done {
        $self->{priv_eidx}->done; # V2Writable::done
        xchg_stderr($self);
        die $err if $err;
+
+       # notify clients ->done has been issued
+       defined($sock_ref) and
+               $self->{s2d_op_p}->pkt_do('sto_done_complete', $sock_ref);
 }
 
 sub ipc_atfork_child {
@@ -493,28 +497,37 @@ sub ipc_atfork_child {
        my $lei = $self->{lei};
        $lei->_lei_atfork_child(1) if $lei;
        xchg_stderr($self);
-       if (my $err = delete($self->{err_pipe})) {
-               close $err->[0];
-               $self->{-err_wr} = $err->[1];
+       if (my $to_close = delete($self->{to_close})) {
+               close($_) for @$to_close;
        }
        $self->SUPER::ipc_atfork_child;
 }
 
 sub write_prepare {
        my ($self, $lei) = @_;
+       $lei // die 'BUG: $lei not passed';
        unless ($self->{-ipc_req}) {
-               my $d = $lei->store_path;
-               $self->ipc_lock_init("$d/ipc.lock");
-               substr($d, -length('/lei/store'), 10, '');
+               # s2d => store-to-daemon messages
+               require PublicInbox::PktOp;
+               my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair;
+               my $dir = $lei->store_path;
+               $self->ipc_lock_init("$dir/ipc.lock");
+               substr($dir, -length('/lei/store'), 10, '');
                pipe(my ($r, $w)) or die "pipe: $!";
-               my $err_pipe = [ $r, $w ];
                # Mail we import into lei are private, so headers filtered out
                # by -mda for public mail are not appropriate
                local @PublicInbox::MDA::BAD_HEADERS = ();
-               $self->ipc_worker_spawn("lei/store $d", $lei->oldset,
-                                       { lei => $lei, err_pipe => $err_pipe });
+               $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, {
+                                       lei => $lei,
+                                       -err_wr => $w,
+                                       to_close => [ $r, $s2d_op_c->{sock} ],
+                                       s2d_op_p => $s2d_op_p,
+                               });
                require PublicInbox::LeiStoreErr;
-               PublicInbox::LeiStoreErr->new($err_pipe->[0], $lei);
+               PublicInbox::LeiStoreErr->new($r, $lei);
+               $s2d_op_c->{ops} = {
+                       sto_done_complete => [ $lei->can('sto_done_complete') ]
+               };
        }
        $lei->{sto} = $self;
 }
index 5e34d864e5844456ace1a61a4d40125dbc1dd25c..1f83e582055c7ef06d40ffae5b632efbd484778b 100644 (file)
@@ -374,8 +374,8 @@ sub query_done { # EOF callback for main daemon
        if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) {
                warn "BUG: {sto} missing with --mail-sync";
        }
-       my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef;
-       $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
+       $lei->sto_done_request if $lei->{sto};
+       my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
        $lei->{ovv}->ovv_end($lei);
        my $start_mua;
        if ($l2m) { # close() calls LeiToMail reap_compress
index 92e150a46bdf2126ad7bcf13e52de2b1ca4794ae..10942dd19b6880335fb465d9de4e3f3113755597 100644 (file)
@@ -56,9 +56,12 @@ sub event_step {
                        ($cmd, @pargs) = split(/ /, $msg);
                }
                my $op = $self->{ops}->{$cmd //= $msg};
-               die "BUG: unknown message: `$cmd'" unless $op;
-               my ($sub, @args) = @$op;
-               $sub->(@args, @pargs);
+               if ($op) {
+                       my ($sub, @args) = @$op;
+                       $sub->(@args, @pargs);
+               } elsif ($msg ne '') {
+                       die "BUG: unknown message: `$cmd'";
+               }
                return $self->close if $msg eq ''; # close on EOF
        }
 }