]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
ipc: drop awaitpid_init to avoid circular refs
[public-inbox.git] / lib / PublicInbox / LEI.pm
index 8a3a3ab607fe501cab30827cda019541dd655ed5..ffd50db5e9d6e3744584a5ee81db8107b9f8bf0f 100644 (file)
@@ -18,7 +18,6 @@ use IO::Handle ();
 use Fcntl qw(SEEK_SET);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::Lock;
 use PublicInbox::Eml;
@@ -399,8 +398,10 @@ my %OPTDESC = (
                'include specified external(s) in search' ],
 'only|O=s@     q' => [ 'LOCATION',
                'only use specified external(s) for search' ],
-'jobs=s        q' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
-               'control number of search and writer jobs' ],
+'jobs|j=s' => [ 'JOBSPEC',
+               'control number of query and writer jobs' .
+               "integers delimited by `,', either of which may be omitted"
+               ],
 'jobs|j=i      add-external' => 'set parallelism when indexing after --mirror',
 
 'in-format|F=s' => $stdin_formats,
@@ -542,12 +543,11 @@ sub child_error { # passes non-fatal curl exit codes to user
        local $current_lei = $self;
        $child_error ||= 1 << 8;
        warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg;
+       $self->{child_error} ||= $child_error;
        if ($self->{pkt_op_p}) { # to top lei-daemon
                $self->{pkt_op_p}->pkt_do('child_error', $child_error);
        } elsif ($self->{sock}) { # to lei(1) client
                send($self->{sock}, "child_error $child_error", MSG_EOR);
-       } else { # non-lei admin command
-               $self->{child_error} ||= $child_error;
        } # else noop if client disconnected
 }
 
@@ -643,12 +643,12 @@ sub workers_start {
        my $end = $lei->pkt_op_pair;
        my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
        $flds->{lei} = $lei;
-       $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
+       $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds,
+               $wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        delete $lei->{pkt_op_p};
        my $op_c = delete $lei->{pkt_op_c};
        @$end = ();
        $lei->event_step_init;
-       $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        ($op_c, $ops);
 }
 
@@ -785,7 +785,7 @@ EOM
        }
 }
 
-sub lazy_cb ($$$) {
+sub lazy_cb ($$$) { # $pfx is _complete_ or lei_
        my ($self, $cmd, $pfx) = @_;
        my $ucmd = $cmd;
        $ucmd =~ tr/-/_/;
@@ -1390,9 +1390,8 @@ sub DESTROY {
        # preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # dwaitpid callback
-       my ($arg, $pid) = @_;
-       my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof)
+       my ($pid, $wq, $lei) = @_;
        local $current_lei = $lei;
        my $err_type = $lei->{-err_type};
        $? and $lei->child_error($?,