From 7e4bd71efcf3bf61216ec1a7577e33be5f95b8a9 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 21 Apr 2022 11:59:06 +0000 Subject: [PATCH] lei: commit store on interrupted partial imports This change prevents lingering shard and git-fast-import processes from remaining after interrupted "lei import" (and similar). It also reduces the likelyhood of data-loss in case of subsequent abnormal termination of the daemon. I think this is the least surprising way to handle users prematurely aborting imports or other similar operations which write to lei/store and will result in reduced bandwidth waste for users with intermittent connections. This is because the lei/store processes may be shared by parallel "lei import" callers, and commits done by any "lei import" caller will inevitably trigger writes for all of them. --- lib/PublicInbox/LEI.pm | 11 ++++++++--- lib/PublicInbox/LeiImportKw.pm | 4 +--- lib/PublicInbox/LeiNoteEvent.pm | 4 +--- lib/PublicInbox/LeiPmdir.pm | 6 ++---- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 4bd9183e..93b4ea03 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -660,6 +660,7 @@ sub wait_wq_events { for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs $wq->wq_close; } + $wq1->{lei_sock} = $lei->{sock} if $wq1; $op_c->{ops} = $ops; } @@ -1405,9 +1406,11 @@ sub fchdir { } sub wq_eof { # EOF callback for main daemon - my ($lei) = @_; + my ($lei, $wq_fld) = @_; local $current_lei = $lei; - delete $lei->{wq1} // return $lei->fail; # already failed + my $wq = delete $lei->{$wq_fld // 'wq1'}; + $lei->sto_done_request($wq); + $wq // $lei->fail; # already failed } sub watch_state_ok ($) { @@ -1514,8 +1517,10 @@ sub lms { } sub sto_done_request { - my ($lei, $sock) = @_; + my ($lei, $wq) = @_; + return unless $lei->{sto}; local $current_lei = $lei; + my $sock = $wq ? $wq->{lei_sock} : undef; eval { if ($sock //= $lei->{sock}) { # issue, async wait $lei->{sto}->wq_io_do('done', [ $sock ]); diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm index 52fd4043..4dd938f5 100644 --- a/lib/PublicInbox/LeiImportKw.pm +++ b/lib/PublicInbox/LeiImportKw.pm @@ -47,9 +47,7 @@ sub ck_update_kw { # via wq_io_do } sub _lei_wq_eof { # EOF callback for main lei daemon - my ($lei) = @_; - my $ikw = delete $lei->{ikw} or return $lei->fail; - $lei->sto_done_request($ikw->{lei_sock}); + $_[0]->wq_eof('ikw'); } 1; diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm index 22d6ffac..db387633 100644 --- a/lib/PublicInbox/LeiNoteEvent.pm +++ b/lib/PublicInbox/LeiNoteEvent.pm @@ -114,9 +114,7 @@ sub ipc_atfork_child { } sub _lei_wq_eof { # EOF callback for main lei daemon - my ($lei) = @_; - my $lne = delete $lei->{lne} or return $lei->fail; - $lei->sto_done_request($lne->{lei_sock}); + $_[0]->wq_eof('lne'); } 1; diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm index f9b68fc2..d4aa0212 100644 --- a/lib/PublicInbox/LeiPmdir.pm +++ b/lib/PublicInbox/LeiPmdir.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # WQ worker for dealing with parallel Maildir reads; @@ -48,9 +48,7 @@ sub mdir_iter { # via wq_io_do } sub _lei_wq_eof { # EOF callback for main lei daemon - my ($lei) = @_; - my $pmd = delete $lei->{pmd} or return $lei->fail; - $lei->sto_done_request($pmd->{lei_sock}); + $_[0]->wq_eof('pmd'); } 1; -- 2.44.0