($op_c, $ops);
}
+# call this when we're ready to wait on events and yield to other clients
+sub wait_wq_events {
+ my ($lei, $op_c, $ops) = @_;
+ for my $wq (grep(defined, @$lei{qw(ikw)})) { # auxiliary WQs
+ $wq->wq_close(1);
+ }
+ $op_c->{ops} = $ops;
+}
+
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
$lei->{wq1} = $self;
$self->wq_io_do('do_solve_blob', []);
$self->wq_close(1);
- $op_c->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub ipc_atfork_child {
$lei->{wq1} = $self;
$self->wq_io_do('process_inputs', []);
$self->wq_close(1);
- $op_c->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub ipc_atfork_child {
$lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
net_merge_all_done($self) unless $lei->{auth};
- $op_c->op_wait_event($ops); # calls net_merge_all_done if $lei->{auth}
+ $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth}
}
sub _complete_export_kw {
(my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
$lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
- $ikw->wq_close(1) if $ikw;
net_merge_all_done($self) unless $lei->{auth};
- $op_c->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub lei_import { # the main "lei import" method
$lei->{wq1} = $self;
$self->wq_io_do('do_ls_search_long', [], $pfx);
$self->wq_close(1);
- $op_c->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub lei_ls_search {
require PublicInbox::Inbox;
require PublicInbox::Admin;
require PublicInbox::InboxWritable;
- my ($op, $ops) = $lei->workers_start($self, 1);
+ my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_mirror', []);
$self->wq_close(1);
- $op->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub ipc_atfork_child {
} else {
$self->{input} = $input;
}
- my ($op, $ops) = $lei->workers_start($self, 1);
+ my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_p2q', []);
$self->wq_close(1);
- $op->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub ipc_atfork_child {
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
net_merge_all_done($self) unless $lei->{auth};
- $op_c->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub ipc_atfork_child {
$lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
net_merge_all_done($self) unless $lei->{auth};
- $op_c->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
no warnings 'once';
$lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
net_merge_all_done($self) unless $lei->{auth};
- $op_c->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub note_missing {
start_query($self);
}
$lei->event_step_init; # wait for shutdowns
- $op_c->op_wait_event($ops);
+ $lei->wait_wq_events($op_c, $ops);
}
sub add_uri {
}
}
-# call this when we're ready to wait on events
-sub op_wait_event {
- my ($self, $ops) = @_;
- $self->{ops} = $ops;
-}
-
1;