This method replaces a common pattern of starting workers,
preparing internal auth ops, and asynchronous waiting of
command completion.
It also adds missing LeiAuth support to rediff and rm
which rarely need auth.
12 files changed:
sub workers_start {
my ($lei, $wq, $jobs, $ops, $flds) = @_;
sub workers_start {
my ($lei, $wq, $jobs, $ops, $flds) = @_;
- $ops = pkt_ops($lei, { ($ops ? %$ops : ()) });
+ $ops //= {};
+ ($wq->can('net_merge_all_done') && $lei->{auth}) and
+ $lei->{auth}->op_merge($ops, $wq, $lei);
+ pkt_ops($lei, $ops);
$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
# call this when we're ready to wait on events and yield to other clients
sub wait_wq_events {
my ($lei, $op_c, $ops) = @_;
# call this when we're ready to wait on events and yield to other clients
sub wait_wq_events {
my ($lei, $op_c, $ops) = @_;
+ my $wq1 = $lei->{wq1};
+ ($wq1 && $wq1->can('net_merge_all_done') && !$lei->{auth}) and
+ $wq1->net_merge_all_done;
for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
$wq->wq_close;
}
$op_c->{ops} = $ops;
}
for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
$wq->wq_close;
}
$op_c->{ops} = $ops;
}
+sub wq1_start {
+ my ($lei, $wq, $jobs) = @_;
+ my ($op_c, $ops) = workers_start($lei, $wq, $jobs // 1);
+ $lei->{wq1} = $wq;
+ wait_wq_events($lei, $op_c, $ops); # net_merge_all_done if !{auth}
+}
+
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
'imap_add_kw' : 'imap_set_kw');
$self->{nwr}->{-skip_creat} = 1;
}
'imap_add_kw' : 'imap_set_kw');
$self->{nwr}->{-skip_creat} = 1;
}
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth}
+ $lei->wq1_start($self);
}
sub _complete_export_kw {
}
sub _complete_export_kw {
if ($self->{o_remote}) { # setup lei->{auth}
$self->prepare_inputs($lei, $self->{o_remote}) or return;
}
if ($self->{o_remote}) { # setup lei->{auth}
$self->prepare_inputs($lei, $self->{o_remote}) or return;
}
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
}
($lei->{opt}->{'new-only'} && (!$net || !$net->{imap_order})) and
warn "# --new-only is only for IMAP\n";
}
($lei->{opt}->{'new-only'} && (!$net || !$net->{imap_order})) and
warn "# --new-only is only for IMAP\n";
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self, $j);
}
sub lei_import { # the main "lei import" method
}
sub lei_import { # the main "lei import" method
$json->pretty(1)->indent(2) if $isatty || $lei->{opt}->{pretty};
}
$lei->start_pager if $isatty;
$json->pretty(1)->indent(2) if $isatty || $lei->{opt}->{pretty};
}
$lei->start_pager if $isatty;
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei);
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth}
+ $lei->wq1_start($self);
}
sub _complete_ls_mail_source {
}
sub _complete_ls_mail_source {
my $isatty = -t $lei->{1};
$lei->{opt}->{color} //= $isatty;
$lei->start_pager if $isatty;
my $isatty = -t $lei->{1};
$lei->{opt}->{color} //= $isatty;
$lei->start_pager if $isatty;
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
$lei->{opt}->{'in-format'} //= 'eml' if $lei->{opt}->{stdin};
my $self = bless { missing_ok => 1 }, __PACKAGE__;
$self->prepare_inputs($lei, \@inputs) or return;
$lei->{opt}->{'in-format'} //= 'eml' if $lei->{opt}->{stdin};
my $self = bless { missing_ok => 1 }, __PACKAGE__;
$self->prepare_inputs($lei, \@inputs) or return;
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
my $isatty = -t $lei->{1};
$lei->{opt}->{color} //= $isatty;
$lei->start_pager if $isatty;
my $isatty = -t $lei->{1};
$lei->{opt}->{color} //= $isatty;
$lei->start_pager if $isatty;
- my ($op_c, $ops) = $lei->workers_start($self, 1);
- $lei->{wq1} = $self;
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
my $self = bless { missing_ok => 1, lms => $lms }, __PACKAGE__;
$lei->{opt}->{'mail-sync'} = 1; # for prepare_inputs
$self->prepare_inputs($lei, \@folders) or return;
my $self = bless { missing_ok => 1, lms => $lms }, __PACKAGE__;
$lei->{opt}->{'mail-sync'} = 1; # for prepare_inputs
$self->prepare_inputs($lei, \@folders) or return;
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth}
+ $lei->wq1_start($self);
}
sub ipc_atfork_child { # needed for PublicInbox::LeiPmdir
}
sub ipc_atfork_child { # needed for PublicInbox::LeiPmdir
$lei->{opt}->{'in-format'} //= 'eml';
my $self = bless {}, __PACKAGE__;
$self->prepare_inputs($lei, \@inputs) or return;
$lei->{opt}->{'in-format'} //= 'eml';
my $self = bless {}, __PACKAGE__;
$self->prepare_inputs($lei, \@inputs) or return;
- my ($op_c, $ops) = $lei->workers_start($self, 1);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
$self->prepare_inputs($lei, \@argv) or return;
grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
return $lei->fail('no keywords or labels specified');
$self->prepare_inputs($lei, \@argv) or return;
grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
return $lei->fail('no keywords or labels specified');
- my $ops = {};
- $lei->{auth}->op_merge($ops, $self, $lei) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
$lei->{-err_type} = 'non-fatal';
- net_merge_all_done($self) unless $lei->{auth};
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
if ($lei->{auth}) { # start auth worker
require PublicInbox::NetWriter;
bless $lei->{net}, 'PublicInbox::NetWriter';
if ($lei->{auth}) { # start auth worker
require PublicInbox::NetWriter;
bless $lei->{net}, 'PublicInbox::NetWriter';
- $lei->{auth}->op_merge(my $ops = {}, $self, $lei);
- (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
- $lei->{wq1} = $self;
- $lei->wait_wq_events($op_c, $ops);
+ $lei->wq1_start($self);
# net_merge_all_done will fire when auth is done
} else {
redispatch_all($self, $lei); # see below
# net_merge_all_done will fire when auth is done
} else {
redispatch_all($self, $lei); # see below