Simplify our internals a little bit.
'child_error' => [ \&child_error, $lei ],
($ops ? %$ops : ()),
};
- $ops->{''} //= [ \&dclose, $lei ];
+ $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&dclose, $lei ];
my $end = $lei->pkt_op_pair;
$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
delete $lei->{pkt_op_p};
use PublicInbox::Spawn qw(spawn popen_rd which);
use PublicInbox::DS;
-sub sol_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
my ($lei) = @_;
my $sol = delete $lei->{sol} // return $lei->dclose; # already failed
$sol->wq_wait_old($lei->can('wq_done_wait'), $lei);
}
require PublicInbox::SolverGit;
my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
- my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1,
- { '' => [ \&sol_done, $lei ] });
+ my ($op_c, $ops) = $lei->workers_start($self, 'lei-blob', 1);
$lei->{sol} = $self;
$self->wq_io_do('do_solve_blob', []);
$self->wq_close(1);
my $devfd = $lei->path_to_fd($ovv->{dst}) // return;
$lei->{opt}->{augment} = 1 if $devfd < 0;
$self->prepare_inputs($lei, \@inputs) or return;
- my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
+ my ($op_c, $ops) = $lei->workers_start($self, 'lei-convert', 1);
$lei->{cnv} = $self;
$self->wq_io_do('process_inputs', []);
$self->wq_close(1);
input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef);
}
-sub import_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
my ($lei) = @_;
my $imp = delete $lei->{imp} // return $lei->fail('BUG: {imp} gone');
$imp->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
my $nproc = $self->detect_nproc;
$j = $nproc if $j > $nproc;
}
- my $ops = { '' => [ \&import_done, $lei ] };
+ my $ops = {};
$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
$self->{-wq_nr_workers} = $j // 1; # locked
- (my $op_c, $ops) = $lei->workers_start($self, 'lei_import', $j, $ops);
+ (my $op_c, $ops) = $lei->workers_start($self, 'lei-import', $j, $ops);
$lei->{imp} = $self;
net_merge_complete($self) unless $lei->{auth};
$op_c->op_wait_event($ops);
$lei->dclose;
}
-sub mirror_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
my ($lei) = @_;
my $mrr = delete $lei->{mrr} or return;
$mrr->wq_wait_old(\&do_finish_mirror, $lei);
require PublicInbox::Inbox;
require PublicInbox::Admin;
require PublicInbox::InboxWritable;
- my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, {
- '' => [ \&mirror_done, $lei ]
- });
+ my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1);
$lei->{mrr} = $self;
$self->wq_io_do('do_mirror', []);
$self->wq_close(1);
} else {
$self->{input} = $input;
}
- my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1);
+ my ($op, $ops) = $lei->workers_start($self, 'lei-p2q', 1);
$lei->{p2q} = $self;
$self->wq_io_do('do_p2q', []);
$self->wq_close(1);
sub input_mbox_cb { input_eml_cb($_[1], $_[0]) }
-sub tag_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
my ($lei) = @_;
- my $tag = delete $lei->{tag} or return;
+ my $tag = delete $lei->{tag} // return $lei->dclose;
$tag->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
}
$self->prepare_inputs($lei, \@argv) or return;
grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
return $lei->fail('no keywords or labels specified');
- my $ops = { '' => [ \&tag_done, $lei ] };
+ my $ops = {};
$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
$self->{vmd_mod} = $vmd_mod;
my $j = $self->{-wq_nr_workers} = 1; # locked for now
- (my $op_c, $ops) = $lei->workers_start($self, 'lei_tag', $j, $ops);
+ (my $op_c, $ops) = $lei->workers_start($self, 'lei-tag', $j, $ops);
$lei->{tag} = $self;
net_merge_complete($self) unless $lei->{auth};
$op_c->op_wait_event($ops);