+sub redispatch_all ($$) {
+ my ($self, $lei) = @_;
+ my $upq = [ (@{$self->{o_local} // []}, @{$self->{o_remote} // []}) ];
+ return up1($lei, $upq->[0]) if @$upq == 1; # just one, may start MUA
+
+ PublicInbox::OverIdx::fork_ok($lei->{opt});
+ # FIXME: this is also used per-query, see lei->_start_query
+ my $j = $lei->{opt}->{jobs} || do {
+ my $n = $self->detect_nproc // 1;
+ $n > 4 ? 4 : $n;
+ };
+ $j = ($j =~ /\A([0-9]+)/) ? $1 + 0 : 1; # may be --jobs=$x,$m on CLI
+ # re-dispatch into our event loop w/o creating an extra fork-level
+ # $upq will be drained via DESTROY as each query finishes
+ $lei->{fmsg} = PublicInbox::LeiFinmsg->new($lei);
+ my ($op_c, $op_p) = PublicInbox::PktOp->pair;
+ # call lei->dclose when upq is done processing:
+ $op_c->{ops} = { '' => [ $lei->can('dclose'), $lei ] };
+ my @first_batch = splice(@$upq, 0, $j); # initial parallelism
+ $lei->{-upq} = $upq;
+ $lei->{daemon_pid} = $$;
+ $lei->event_step_init; # wait for client disconnects
+ for my $out (@first_batch) {
+ PublicInbox::DS::requeue(
+ PublicInbox::LeiUp1::nxt($lei, $out, $op_p));
+ }
+}
+
+sub filter_lss {
+ my ($self, $lei, $all) = @_;
+ my @outs = PublicInbox::LeiSavedSearch::list($lei);
+ if ($all eq 'local') {
+ $self->{o_local} = [ grep(!/$REMOTE_RE/, @outs) ];
+ } elsif ($all eq 'remote') {
+ $self->{o_remote} = [ grep(/$REMOTE_RE/, @outs) ];
+ } elsif ($all eq '') {
+ $self->{o_remote} = [ grep(/$REMOTE_RE/, @outs) ];
+ $self->{o_local} = [ grep(!/$REMOTE_RE/, @outs) ];
+ } else {
+ undef;
+ }
+}
+
+sub lei_up {
+ my ($lei, @outs) = @_;
+ my $opt = $lei->{opt};
+ my $self = bless { -mail_sync => 1 }, __PACKAGE__;
+ if (defined(my $all = $opt->{all})) {
+ return $lei->fail("--all and @outs incompatible") if @outs;
+ defined($opt->{mua}) and return
+ $lei->fail('--all and --mua= are incompatible');
+ filter_lss($self, $lei, $all) // return
+ $lei->fail("only --all=$all not understood");
+ } elsif ($lei->{lse}) { # redispatched
+ scalar(@outs) == 1 or die "BUG: lse set w/ >1 out[@outs]";
+ return up1($lei, $outs[0]);
+ } else {
+ $self->{o_remote} = [ grep(/$REMOTE_RE/, @outs) ];
+ $self->{o_local} = [ grep(!/$REMOTE_RE/, @outs) ];
+ }
+ $lei->{lse} = $lei->_lei_store(1)->write_prepare($lei)->search;
+ ((@{$self->{o_local} // []} + @{$self->{o_remote} // []}) > 1 &&
+ defined($opt->{mua})) and return $lei->fail(<<EOM);
+multiple outputs and --mua= are incompatible
+EOM
+ if ($self->{o_remote}) { # setup lei->{auth}
+ $self->prepare_inputs($lei, $self->{o_remote}) or return;
+ }
+ if ($lei->{auth}) { # start auth worker
+ require PublicInbox::NetWriter;
+ bless $lei->{net}, 'PublicInbox::NetWriter';
+ $lei->wq1_start($self);
+ # net_merge_all_done will fire when auth is done
+ } else {
+ redispatch_all($self, $lei); # see below
+ }
+}
+
+# called in top-level lei-daemon when LeiAuth is done
+sub net_merge_all_done {
+ my ($self, $lei) = @_;
+ $lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
+ $self->wq_close;
+ eval { redispatch_all($self, $lei) };
+ $lei->child_error(0, "E: $@") if $@;
+}
+
+sub _complete_up { # lei__complete hook