]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: simple WQ workers use {wq1} field
authorEric Wong <e@80x24.org>
Wed, 28 Apr 2021 07:52:02 +0000 (07:52 +0000)
committerEric Wong <e@80x24.org>
Wed, 28 Apr 2021 19:30:58 +0000 (19:30 +0000)
This lets us share more code and reduces cognitive overhead when
it comes to picking names (because {lsss} was ridiculous).

We'll need to ensure the first error set in lei is the actual
error we exit with, otherwise things can get confusing and
errors may get lost.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiBlob.pm
lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiLsSearch.pm
lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/LeiP2q.pm
lib/PublicInbox/LeiTag.pm
script/lei

index cfbf12f00f2bebbc73c115c0aebb775513f03f23..403f9ed8872a1226839d26bcf79d3e388c6baa3a 100644 (file)
@@ -382,7 +382,7 @@ my %CONFIG_KEYS = (
        'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m imp mrr cnv p2q tag sol lsss); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1); # internal workers
 
 sub _drop_wq {
        my ($self) = @_;
@@ -542,7 +542,7 @@ sub workers_start {
                'child_error' => [ \&child_error, $lei ],
                ($ops ? %$ops : ()),
        };
-       $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&dclose, $lei ];
+       $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
        my $end = $lei->pkt_op_pair;
        $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
        delete $lei->{pkt_op_p};
@@ -1237,9 +1237,17 @@ sub DESTROY {
 
 sub wq_done_wait { # dwaitpid callback
        my ($arg, $pid) = @_;
-       my ($wq, $lei, $e) = @$arg;
-       $? and $lei->child_error($?, $e ? "$e errors during $lei->{cmd}" : ());
+       my ($wq, $lei) = @$arg;
+       my $err_type = $lei->{-err_type};
+       $? and $lei->child_error($?,
+                       $err_type ? "$err_type errors during $lei->{cmd}" : ());
        $lei->dclose;
 }
 
+sub wq_eof { # EOF callback for main daemon
+       my ($lei) = @_;
+       my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
+       $wq1->wq_wait_old(\&wq_done_wait, $lei);
+}
+
 1;
index ff079e6584413d7b54aefba64cc2ca793f7ff2e1..0a9573589a29711630dbb631cdb6ec1f6ab87cf4 100644 (file)
@@ -10,12 +10,6 @@ use parent qw(PublicInbox::IPC);
 use PublicInbox::Spawn qw(spawn popen_rd which);
 use PublicInbox::DS;
 
-sub _lei_wq_eof { # EOF callback for main daemon
-       my ($lei) = @_;
-       my $sol = delete $lei->{sol} // return $lei->dclose; # already failed
-       $sol->wq_wait_old($lei->can('wq_done_wait'), $lei);
-}
-
 sub get_git_dir ($$) {
        my ($lei, $d) = @_;
        return $d if -d "$d/objects" && -d "$d/refs" && -e "$d/HEAD";
@@ -158,7 +152,7 @@ EOM
        require PublicInbox::SolverGit;
        my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
        my ($op_c, $ops) = $lei->workers_start($self, 'lei-blob', 1);
-       $lei->{sol} = $self;
+       $lei->{wq1} = $self;
        $self->wq_io_do('do_solve_blob', []);
        $self->wq_close(1);
        $op_c->op_wait_event($ops);
index 14bed901f75fe5a67315c3fe98a79d466633215b..cefcaf65563e9e2fd9bfb295f91253855413c2c1 100644 (file)
@@ -53,7 +53,7 @@ sub lei_convert { # the main "lei convert" method
        $lei->{opt}->{augment} = 1 if $devfd < 0;
        $self->prepare_inputs($lei, \@inputs) or return;
        my ($op_c, $ops) = $lei->workers_start($self, 'lei-convert', 1);
-       $lei->{cnv} = $self;
+       $lei->{wq1} = $self;
        $self->wq_io_do('process_inputs', []);
        $self->wq_close(1);
        $op_c->op_wait_event($ops);
index f2a0c95a27ab99dbba1bac42705be5f0ebc29023..26127ecec4b1fa3ad6f0e8fdc56cd04cd4cf9260 100644 (file)
@@ -53,12 +53,6 @@ sub input_nntp_cb { # nntp_each
        input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef);
 }
 
-sub _lei_wq_eof { # EOF callback for main daemon
-       my ($lei) = @_;
-       my $imp = delete $lei->{imp} // return $lei->fail('BUG: {imp} gone');
-       $imp->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
-}
-
 sub net_merge_complete { # callback used by LeiAuth
        my ($self) = @_;
        $self->wq_io_do('process_inputs');
@@ -95,7 +89,8 @@ sub lei_import { # the main "lei import" method
        $self->{-wq_nr_workers} = $j // 1; # locked
        $lei->{-eml_noisy} = 1;
        (my $op_c, $ops) = $lei->workers_start($self, 'lei-import', $j, $ops);
-       $lei->{imp} = $self;
+       $lei->{wq1} = $self;
+       $lei->{-err_type} = 'non-fatal';
        net_merge_complete($self) unless $lei->{auth};
        $op_c->op_wait_event($ops);
 }
index 9ac4870f8399f705cf3323abfcc5f4aac8c5f0ac..a00e78fcc96834a74ad6a2e1ad55c19647e79595 100644 (file)
@@ -73,7 +73,7 @@ sub bg_worker ($$$) {
        my ($lei, $pfx, $json) = @_;
        my $self = bless { -wq_nr_workers => 1, json => $json }, __PACKAGE__;
        my ($op_c, $ops) = $lei->workers_start($self, 'ls-search', 1);
-       $lei->{lsss} = $self;
+       $lei->{wq1} = $self;
        $self->wq_io_do('do_ls_search_long', [], $pfx);
        $self->wq_close(1);
        $op_c->op_wait_event($ops);
index 50ab4c8515288357cf223f5306076d8f2cd61109..db97b98c8dfed69f1dabb78c207998c1317034ba 100644 (file)
@@ -26,7 +26,7 @@ sub do_finish_mirror { # dwaitpid callback
 
 sub _lei_wq_eof { # EOF callback for main daemon
        my ($lei) = @_;
-       my $mrr = delete $lei->{mrr} or return;
+       my $mrr = delete $lei->{wq1} or return $lei->fail;
        $mrr->wq_wait_old(\&do_finish_mirror, $lei);
 }
 
@@ -283,7 +283,7 @@ sub start {
        require PublicInbox::Admin;
        require PublicInbox::InboxWritable;
        my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1);
-       $lei->{mrr} = $self;
+       $lei->{wq1} = $self;
        $self->wq_io_do('do_mirror', []);
        $self->wq_close(1);
        $op->op_wait_event($ops);
index deb3197483fd2186a3e75028d36b86d474d5cab0..b48934898db916b9f2a7a4bbd8dc859138477510 100644 (file)
@@ -189,7 +189,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point
                $self->{input} = $input;
        }
        my ($op, $ops) = $lei->workers_start($self, 'lei-p2q', 1);
-       $lei->{p2q} = $self;
+       $lei->{wq1} = $self;
        $self->wq_io_do('do_p2q', []);
        $self->wq_close(1);
        $op->op_wait_event($ops);
@@ -201,10 +201,4 @@ sub ipc_atfork_child {
        $self->SUPER::ipc_atfork_child;
 }
 
-sub _lei_wq_eof { # EOF callback for main daemon
-       my ($lei) = @_;
-       my $p2q = delete $lei->{p2q} // return $lei->dclose;
-       $p2q->wq_wait_old($lei->can('wq_done_wait'), $lei);
-}
-
 1;
index 3cda2eca9065105c8ff39aa5cd693256b863a88c..989a6954454e709ae98336ee3f05ac89fce2f77b 100644 (file)
@@ -19,12 +19,6 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
 
 sub input_mbox_cb { input_eml_cb($_[1], $_[0]) }
 
-sub _lei_wq_eof { # EOF callback for main daemon
-       my ($lei) = @_;
-       my $tag = delete $lei->{tag} // return $lei->dclose;
-       $tag->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
-}
-
 sub net_merge_complete { # callback used by LeiAuth
        my ($self) = @_;
        $self->wq_io_do('process_inputs');
@@ -57,7 +51,8 @@ sub lei_tag { # the "lei tag" method
        $self->{vmd_mod} = $vmd_mod;
        my $j = $self->{-wq_nr_workers} = 1; # locked for now
        (my $op_c, $ops) = $lei->workers_start($self, 'lei-tag', $j, $ops);
-       $lei->{tag} = $self;
+       $lei->{wq1} = $self;
+       $lei->{-err_type} = 'non-fatal';
        net_merge_complete($self) unless $lei->{auth};
        $op_c->op_wait_event($ops);
 }
index db302422f8b68989d061475e9f491fe07163ca53..90a9383928f875faae25fa065a48a071a3273fef 100755 (executable)
@@ -116,10 +116,10 @@ Falling back to (slow) one-shot mode
                } elsif ($buf eq '-WINCH') {
                        kill($buf, @parent); # for MUA
                } elsif ($buf =~ /\Ax_it ([0-9]+)\z/) {
-                       $x_it_code = $1 + 0;
+                       $x_it_code ||= $1 + 0;
                        last;
                } elsif ($buf =~ /\Achild_error ([0-9]+)\z/) {
-                       $x_it_code = $1 + 0;
+                       $x_it_code ||= $1 + 0;
                } else {
                        $sigchld->();
                        die $buf;