]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: standardize on _lei_wq_eof callback for workers
authorEric Wong <e@80x24.org>
Tue, 27 Apr 2021 11:07:51 +0000 (11:07 +0000)
committerEric Wong <e@80x24.org>
Wed, 28 Apr 2021 01:28:56 +0000 (21:28 -0400)
Simplify our internals a little bit.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiBlob.pm
lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/LeiP2q.pm
lib/PublicInbox/LeiTag.pm

index c170572bb919d10588961adef7b64b1ce0f54e9e..effc905ac4d323bdf926d57ac538fd49b3f56bee 100644 (file)
@@ -531,7 +531,7 @@ sub workers_start {
                'child_error' => [ \&child_error, $lei ],
                ($ops ? %$ops : ()),
        };
-       $ops->{''} //= [ \&dclose, $lei ];
+       $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&dclose, $lei ];
        my $end = $lei->pkt_op_pair;
        $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
        delete $lei->{pkt_op_p};
index 4e52c8a5846e6dbd6bd7472f022fa7793c5bd9f0..0b96bd04cef553196d9f6e2b2227349c6c934907 100644 (file)
@@ -10,7 +10,7 @@ use parent qw(PublicInbox::IPC);
 use PublicInbox::Spawn qw(spawn popen_rd which);
 use PublicInbox::DS;
 
-sub sol_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
        my ($lei) = @_;
        my $sol = delete $lei->{sol} // return $lei->dclose; # already failed
        $sol->wq_wait_old($lei->can('wq_done_wait'), $lei);
@@ -157,8 +157,7 @@ EOM
        }
        require PublicInbox::SolverGit;
        my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
-       my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1,
-               { '' => [ \&sol_done, $lei ] });
+       my ($op_c, $ops) = $lei->workers_start($self, 'lei-blob', 1);
        $lei->{sol} = $self;
        $self->wq_io_do('do_solve_blob', []);
        $self->wq_close(1);
index 0ce49ea9a866866ea9ea72a3db3d4b70dfdf52ce..0c3241694f70d525fe7300a7eddced06c77012f6 100644 (file)
@@ -52,7 +52,7 @@ sub lei_convert { # the main "lei convert" method
        my $devfd = $lei->path_to_fd($ovv->{dst}) // return;
        $lei->{opt}->{augment} = 1 if $devfd < 0;
        $self->prepare_inputs($lei, \@inputs) or return;
-       my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
+       my ($op_c, $ops) = $lei->workers_start($self, 'lei-convert', 1);
        $lei->{cnv} = $self;
        $self->wq_io_do('process_inputs', []);
        $self->wq_close(1);
index daaa6753f218a86bbc915f6bb718f2ae3108febf..e0d899cc0a015437a815a89015464b2f206e0e48 100644 (file)
@@ -53,7 +53,7 @@ sub input_nntp_cb { # nntp_each
        input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef);
 }
 
-sub import_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
        my ($lei) = @_;
        my $imp = delete $lei->{imp} // return $lei->fail('BUG: {imp} gone');
        $imp->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
@@ -90,10 +90,10 @@ sub lei_import { # the main "lei import" method
                my $nproc = $self->detect_nproc;
                $j = $nproc if $j > $nproc;
        }
-       my $ops = { '' => [ \&import_done, $lei ] };
+       my $ops = {};
        $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
        $self->{-wq_nr_workers} = $j // 1; # locked
-       (my $op_c, $ops) = $lei->workers_start($self, 'lei_import', $j, $ops);
+       (my $op_c, $ops) = $lei->workers_start($self, 'lei-import', $j, $ops);
        $lei->{imp} = $self;
        net_merge_complete($self) unless $lei->{auth};
        $op_c->op_wait_event($ops);
index 15adb71b5d4a4dcff30efd919cb261b829d32159..50ab4c8515288357cf223f5306076d8f2cd61109 100644 (file)
@@ -24,7 +24,7 @@ sub do_finish_mirror { # dwaitpid callback
        $lei->dclose;
 }
 
-sub mirror_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
        my ($lei) = @_;
        my $mrr = delete $lei->{mrr} or return;
        $mrr->wq_wait_old(\&do_finish_mirror, $lei);
@@ -282,9 +282,7 @@ sub start {
        require PublicInbox::Inbox;
        require PublicInbox::Admin;
        require PublicInbox::InboxWritable;
-       my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, {
-               '' => [ \&mirror_done, $lei ]
-       });
+       my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1);
        $lei->{mrr} = $self;
        $self->wq_io_do('do_mirror', []);
        $self->wq_close(1);
index cb2309c7adc9a1389d84285337221d83372ed8f6..3248afd77e58022413eec33d6ef3ce9f926ebae1 100644 (file)
@@ -188,7 +188,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point
        } else {
                $self->{input} = $input;
        }
-       my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1);
+       my ($op, $ops) = $lei->workers_start($self, 'lei-p2q', 1);
        $lei->{p2q} = $self;
        $self->wq_io_do('do_p2q', []);
        $self->wq_close(1);
index f5791947220637498c7e4ac93be994f8f4125c22..3cda2eca9065105c8ff39aa5cd693256b863a88c 100644 (file)
@@ -19,9 +19,9 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
 
 sub input_mbox_cb { input_eml_cb($_[1], $_[0]) }
 
-sub tag_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
        my ($lei) = @_;
-       my $tag = delete $lei->{tag} or return;
+       my $tag = delete $lei->{tag} // return $lei->dclose;
        $tag->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
 }
 
@@ -52,11 +52,11 @@ sub lei_tag { # the "lei tag" method
        $self->prepare_inputs($lei, \@argv) or return;
        grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
                return $lei->fail('no keywords or labels specified');
-       my $ops = { '' => [ \&tag_done, $lei ] };
+       my $ops = {};
        $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
        $self->{vmd_mod} = $vmd_mod;
        my $j = $self->{-wq_nr_workers} = 1; # locked for now
-       (my $op_c, $ops) = $lei->workers_start($self, 'lei_tag', $j, $ops);
+       (my $op_c, $ops) = $lei->workers_start($self, 'lei-tag', $j, $ops);
        $lei->{tag} = $self;
        net_merge_complete($self) unless $lei->{auth};
        $op_c->op_wait_event($ops);