]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: commit store on interrupted partial imports
authorEric Wong <e@80x24.org>
Thu, 21 Apr 2022 11:59:06 +0000 (11:59 +0000)
committerEric Wong <e@80x24.org>
Fri, 22 Apr 2022 03:05:56 +0000 (03:05 +0000)
This change prevents lingering shard and git-fast-import
processes from remaining after interrupted "lei import" (and
similar).  It also reduces the likelyhood of data-loss in case
of subsequent abnormal termination of the daemon.

I think this is the least surprising way to handle users
prematurely aborting imports or other similar operations which
write to lei/store and will result in reduced bandwidth waste
for users with intermittent connections.  This is because the
lei/store processes may be shared by parallel "lei import"
callers, and commits done by any "lei import" caller will
inevitably trigger writes for all of them.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiImportKw.pm
lib/PublicInbox/LeiNoteEvent.pm
lib/PublicInbox/LeiPmdir.pm

index 4bd9183e969247da542c2c70a8b4d9935c8b8733..93b4ea03b67d58a9193b99e3970e94e221b2aff9 100644 (file)
@@ -660,6 +660,7 @@ sub wait_wq_events {
        for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
                $wq->wq_close;
        }
+       $wq1->{lei_sock} = $lei->{sock} if $wq1;
        $op_c->{ops} = $ops;
 }
 
@@ -1405,9 +1406,11 @@ sub fchdir {
 }
 
 sub wq_eof { # EOF callback for main daemon
-       my ($lei) = @_;
+       my ($lei, $wq_fld) = @_;
        local $current_lei = $lei;
-       delete $lei->{wq1} // return $lei->fail; # already failed
+       my $wq = delete $lei->{$wq_fld // 'wq1'};
+       $lei->sto_done_request($wq);
+       $wq // $lei->fail; # already failed
 }
 
 sub watch_state_ok ($) {
@@ -1514,8 +1517,10 @@ sub lms {
 }
 
 sub sto_done_request {
-       my ($lei, $sock) = @_;
+       my ($lei, $wq) = @_;
+       return unless $lei->{sto};
        local $current_lei = $lei;
+       my $sock = $wq ? $wq->{lei_sock} : undef;
        eval {
                if ($sock //= $lei->{sock}) { # issue, async wait
                        $lei->{sto}->wq_io_do('done', [ $sock ]);
index 52fd40436a7d262187eee11f70700a68c095eff8..4dd938f5c12c85ad41d1ed322aad9a33d59c49b8 100644 (file)
@@ -47,9 +47,7 @@ sub ck_update_kw { # via wq_io_do
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
-       my ($lei) = @_;
-       my $ikw = delete $lei->{ikw} or return $lei->fail;
-       $lei->sto_done_request($ikw->{lei_sock});
+       $_[0]->wq_eof('ikw');
 }
 
 1;
index 22d6ffac9feb7e4cfabbb96d88542a58fc61ae30..db387633e877c9e3b8bbfd3c4458a12b8695188a 100644 (file)
@@ -114,9 +114,7 @@ sub ipc_atfork_child {
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
-       my ($lei) = @_;
-       my $lne = delete $lei->{lne} or return $lei->fail;
-       $lei->sto_done_request($lne->{lei_sock});
+       $_[0]->wq_eof('lne');
 }
 
 1;
index f9b68fc2a2d83b660f6101d97fc20e96136f3f45..d4aa0212bea7dfb4ce3766cf13eb20c040eb0ecc 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # WQ worker for dealing with parallel Maildir reads;
@@ -48,9 +48,7 @@ sub mdir_iter { # via wq_io_do
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
-       my ($lei) = @_;
-       my $pmd = delete $lei->{pmd} or return $lei->fail;
-       $lei->sto_done_request($pmd->{lei_sock});
+       $_[0]->wq_eof('pmd');
 }
 
 1;