X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLEI.pm;h=ffd50db5e9d6e3744584a5ee81db8107b9f8bf0f;hb=7b654d175cf2e31b4354929ea678563f534947e5;hp=89aa4119aef22f4868f9f738a0e7e516b3c42119;hpb=d1fd2feb587b34d66227a6cb4f9c8b930812ddbf;p=public-inbox.git diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 89aa4119..ffd50db5 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -18,7 +18,6 @@ use IO::Handle (); use Fcntl qw(SEEK_SET); use PublicInbox::Config; use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::Lock; use PublicInbox::Eml; @@ -253,6 +252,8 @@ our %CMD = ( # sorted in order of importance/use: 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch', qw(prune), @c_opt ], +'reindex' => [ '', 'reindex all locally-indexed messages', @c_opt ], + 'index' => [ 'LOCATION...', 'one-time index from URL or filesystem', qw(in-format|F=s kw! offset=i recursive|r exclude=s include|I=s verbose|v+ incremental!), @net_opt, # mainly for --proxy= @@ -397,8 +398,10 @@ my %OPTDESC = ( 'include specified external(s) in search' ], 'only|O=s@ q' => [ 'LOCATION', 'only use specified external(s) for search' ], -'jobs=s q' => [ '[SEARCH_JOBS][,WRITER_JOBS]', - 'control number of search and writer jobs' ], +'jobs|j=s' => [ 'JOBSPEC', + 'control number of query and writer jobs' . + "integers delimited by `,', either of which may be omitted" + ], 'jobs|j=i add-external' => 'set parallelism when indexing after --mirror', 'in-format|F=s' => $stdin_formats, @@ -540,12 +543,11 @@ sub child_error { # passes non-fatal curl exit codes to user local $current_lei = $self; $child_error ||= 1 << 8; warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg; + $self->{child_error} ||= $child_error; if ($self->{pkt_op_p}) { # to top lei-daemon $self->{pkt_op_p}->pkt_do('child_error', $child_error); } elsif ($self->{sock}) { # to lei(1) client send($self->{sock}, "child_error $child_error", MSG_EOR); - } else { # non-lei admin command - $self->{child_error} ||= $child_error; } # else noop if client disconnected } @@ -641,12 +643,12 @@ sub workers_start { my $end = $lei->pkt_op_pair; my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker"; $flds->{lei} = $lei; - $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); + $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds, + $wq->can('_wq_done_wait') // \&wq_done_wait, $lei); delete $lei->{pkt_op_p}; my $op_c = delete $lei->{pkt_op_c}; @$end = (); $lei->event_step_init; - $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); ($op_c, $ops); } @@ -783,7 +785,7 @@ EOM } } -sub lazy_cb ($$$) { +sub lazy_cb ($$$) { # $pfx is _complete_ or lei_ my ($self, $cmd, $pfx) = @_; my $ucmd = $cmd; $ucmd =~ tr/-/_/; @@ -1388,13 +1390,12 @@ sub DESTROY { # preserve $? for ->fail or ->x_it code } -sub wq_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($wq, $lei) = @$arg; +sub wq_done_wait { # awaitpid cb (via wq_eof) + my ($pid, $wq, $lei) = @_; local $current_lei = $lei; my $err_type = $lei->{-err_type}; $? and $lei->child_error($?, - $err_type ? "$err_type errors during $lei->{cmd}" : ()); + $err_type ? "$err_type errors during $lei->{cmd} \$?=$?" : ()); $lei->dclose; } @@ -1520,13 +1521,10 @@ sub sto_done_request { return unless $lei->{sto}; local $current_lei = $lei; my $sock = $wq ? $wq->{lei_sock} : undef; - eval { - if ($sock //= $lei->{sock}) { # issue, async wait - $lei->{sto}->wq_io_do('done', [ $sock ]); - } else { # forcibly wait - my $wait = $lei->{sto}->wq_do('done'); - } - }; + $sock //= $lei->{sock}; + my @io; + push(@io, $sock) if $sock; # async wait iff possible + eval { $lei->{sto}->wq_io_do('done', \@io) }; warn($@) if $@; }