sub workers_start {
my ($lei, $wq, $jobs, $ops, $flds) = @_;
- $ops = pkt_ops($lei, { ($ops ? %$ops : ()) });
+ $ops //= {};
+ ($wq->can('net_merge_all_done') && $lei->{auth}) and
+ $lei->{auth}->op_merge($ops, $wq, $lei);
+ pkt_ops($lei, $ops);
$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
# call this when we're ready to wait on events and yield to other clients
sub wait_wq_events {
my ($lei, $op_c, $ops) = @_;
+ my $wq1 = $lei->{wq1};
+ ($wq1 && $wq1->can('net_merge_all_done') && !$lei->{auth}) and
+ $wq1->net_merge_all_done;
for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
$wq->wq_close;
}
$op_c->{ops} = $ops;
}
+sub wq1_start {
+ my ($lei, $wq, $jobs) = @_;
+ my ($op_c, $ops) = workers_start($lei, $wq, $jobs // 1);
+ $lei->{wq1} = $wq;
+ wait_wq_events($lei, $op_c, $ops); # net_merge_all_done if !{auth}
+}
+
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);