sub workers_start {
my ($lei, $wq, $jobs, $ops, $flds) = @_;
- $ops = pkt_ops($lei, { ($ops ? %$ops : ()) });
+ $ops //= {};
+ ($wq->can('net_merge_all_done') && $lei->{auth}) and
+ $lei->{auth}->op_merge($ops, $wq, $lei);
+ pkt_ops($lei, $ops);
$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
# call this when we're ready to wait on events and yield to other clients
sub wait_wq_events {
my ($lei, $op_c, $ops) = @_;
+ my $wq1 = $lei->{wq1};
+ ($wq1 && $wq1->can('net_merge_all_done') && !$lei->{auth}) and
+ $wq1->net_merge_all_done;
for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
$wq->wq_close;
}
$op_c->{ops} = $ops;
}
+sub wq1_start {
+ my ($lei, $wq, $jobs) = @_;
+ my ($op_c, $ops) = workers_start($lei, $wq, $jobs // 1);
+ $lei->{wq1} = $wq;
+ wait_wq_events($lei, $op_c, $ops); # net_merge_all_done if !{auth}
+}
+
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
'imap_add_kw' : 'imap_set_kw');
$self->{nwr}->{-skip_creat} = 1;
}
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth}
+ $lei->wq1_start($self);
}
sub _complete_export_kw {
if ($self->{o_remote}) { # setup lei->{auth}
$self->prepare_inputs($lei, $self->{o_remote}) or return;
}
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
}
sub do_prune {
}
($lei->{opt}->{'new-only'} && (!$net || !$net->{imap_order})) and
warn "# --new-only is only for IMAP\n";
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
$lei->{-eml_noisy} = 1;
- (my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self, $j);
}
sub lei_import { # the main "lei import" method
$json->pretty(1)->indent(2) if $isatty || $lei->{opt}->{pretty};
}
$lei->start_pager if $isatty;
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei);
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth}
+ $lei->wq1_start($self);
}
sub _complete_ls_mail_source {
my $isatty = -t $lei->{1};
$lei->{opt}->{color} //= $isatty;
$lei->start_pager if $isatty;
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
}
no warnings 'once';
$lei->{opt}->{'in-format'} //= 'eml' if $lei->{opt}->{stdin};
my $self = bless { missing_ok => 1 }, __PACKAGE__;
$self->prepare_inputs($lei, \@inputs) or return;
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
}
sub ipc_atfork_child {
my $isatty = -t $lei->{1};
$lei->{opt}->{color} //= $isatty;
$lei->start_pager if $isatty;
- my ($op_c, $ops) = $lei->workers_start($self, 1);
- $lei->{wq1} = $self;
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
}
sub ipc_atfork_child {
my $self = bless { missing_ok => 1, lms => $lms }, __PACKAGE__;
$lei->{opt}->{'mail-sync'} = 1; # for prepare_inputs
$self->prepare_inputs($lei, \@folders) or return;
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth}
+ $lei->wq1_start($self);
}
sub ipc_atfork_child { # needed for PublicInbox::LeiPmdir
$lei->{opt}->{'in-format'} //= 'eml';
my $self = bless {}, __PACKAGE__;
$self->prepare_inputs($lei, \@inputs) or return;
- my ($op_c, $ops) = $lei->workers_start($self, 1);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
}
no warnings 'once';
$self->prepare_inputs($lei, \@argv) or return;
grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
return $lei->fail('no keywords or labels specified');
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
}
sub note_unimported {
if ($lei->{auth}) { # start auth worker
require PublicInbox::NetWriter;
bless $lei->{net}, 'PublicInbox::NetWriter';
- $lei->{auth}->op_merge(my $ops = {}, $self, $lei);
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
# net_merge_all_done will fire when auth is done
} else {
redispatch_all($self, $lei); # see below