]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
No ext_urls
[public-inbox.git] / lib / PublicInbox / LEI.pm
index 9ab91714d0b8b725fdda3b92889b70574dce6b83..d05b20dee4873ce57a6e0d9c44bf54d21dbe6c37 100644 (file)
@@ -6,8 +6,7 @@
 # local clients with read/write access to the FS and use as many
 # system resources as the local user has access to.
 package PublicInbox::LEI;
-use strict;
-use v5.10.1;
+use v5.12;
 use parent qw(PublicInbox::DS PublicInbox::LeiExternal
        PublicInbox::LeiQuery);
 use Getopt::Long ();
@@ -19,7 +18,6 @@ use IO::Handle ();
 use Fcntl qw(SEEK_SET);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::Lock;
 use PublicInbox::Eml;
@@ -254,6 +252,8 @@ our %CMD = ( # sorted in order of importance/use:
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
        qw(prune), @c_opt ],
 
+'reindex' => [ '', 'reindex all locally-indexed messages', @c_opt ],
+
 'index' => [ 'LOCATION...', 'one-time index from URL or filesystem',
        qw(in-format|F=s kw! offset=i recursive|r exclude=s include|I=s
        verbose|v+ incremental!), @net_opt, # mainly for --proxy=
@@ -398,8 +398,10 @@ my %OPTDESC = (
                'include specified external(s) in search' ],
 'only|O=s@     q' => [ 'LOCATION',
                'only use specified external(s) for search' ],
-'jobs=s        q' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
-               'control number of search and writer jobs' ],
+'jobs|j=s' => [ 'JOBSPEC',
+               'control number of query and writer jobs' .
+               "integers delimited by `,', either of which may be omitted"
+               ],
 'jobs|j=i      add-external' => 'set parallelism when indexing after --mirror',
 
 'in-format|F=s' => $stdin_formats,
@@ -541,12 +543,11 @@ sub child_error { # passes non-fatal curl exit codes to user
        local $current_lei = $self;
        $child_error ||= 1 << 8;
        warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg;
+       $self->{child_error} ||= $child_error;
        if ($self->{pkt_op_p}) { # to top lei-daemon
                $self->{pkt_op_p}->pkt_do('child_error', $child_error);
        } elsif ($self->{sock}) { # to lei(1) client
                send($self->{sock}, "child_error $child_error", MSG_EOR);
-       } else { # non-lei admin command
-               $self->{child_error} ||= $child_error;
        } # else noop if client disconnected
 }
 
@@ -560,17 +561,17 @@ sub note_sigpipe { # triggers sigpipe_handler
 sub _lei_atfork_child {
        my ($self, $persist) = @_;
        # we need to explicitly close things which are on stack
+       my $cfg = $self->{cfg};
        if ($persist) {
                open $self->{3}, '<', '/' or die "open(/) $!";
                fchdir($self);
                close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)}));
-               if (my $cfg = $self->{cfg}) {
-                       delete @$cfg{qw(-lei_store -watches -lei_note_event)};
-               }
+               delete @$cfg{qw(-lei_store -watches -lei_note_event)};
        } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly
                open STDERR, '+>&='.fileno($self->{2}) or warn "open $!";
                STDERR->autoflush(1);
                POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!";
+               delete @$cfg{qw(-watches -lei_note_event)};
        }
        close($_) for (grep(defined, delete @$self{qw(old_1 au_done)}));
        delete $self->{-socks};
@@ -642,12 +643,12 @@ sub workers_start {
        my $end = $lei->pkt_op_pair;
        my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
        $flds->{lei} = $lei;
-       $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
+       $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds,
+               $wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        delete $lei->{pkt_op_p};
        my $op_c = delete $lei->{pkt_op_c};
        @$end = ();
        $lei->event_step_init;
-       $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        ($op_c, $ops);
 }
 
@@ -660,6 +661,7 @@ sub wait_wq_events {
        for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
                $wq->wq_close;
        }
+       $wq1->{lei_sock} = $lei->{sock} if $wq1;
        $op_c->{ops} = $ops;
 }
 
@@ -783,7 +785,7 @@ EOM
        }
 }
 
-sub lazy_cb ($$$) {
+sub lazy_cb ($$$) { # $pfx is _complete_ or lei_
        my ($self, $cmd, $pfx) = @_;
        my $ucmd = $cmd;
        $ucmd =~ tr/-/_/;
@@ -1277,6 +1279,9 @@ sub lazy_start {
                        require PublicInbox::CmdIPC4;
                        $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4');
                        PublicInbox::CmdIPC4->can('recv_cmd4');
+               } // do {
+                       $send_cmd = PublicInbox::Syscall->can('send_cmd4');
+                       PublicInbox::Syscall->can('recv_cmd4');
                };
        }
        $recv_cmd or die <<"";
@@ -1385,13 +1390,12 @@ sub DESTROY {
        # preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # dwaitpid callback
-       my ($arg, $pid) = @_;
-       my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof)
+       my ($pid, $wq, $lei) = @_;
        local $current_lei = $lei;
        my $err_type = $lei->{-err_type};
        $? and $lei->child_error($?,
-                       $err_type ? "$err_type errors during $lei->{cmd}" : ());
+               $err_type ? "$err_type errors during $lei->{cmd} \$?=$?" : ());
        $lei->dclose;
 }
 
@@ -1402,9 +1406,11 @@ sub fchdir {
 }
 
 sub wq_eof { # EOF callback for main daemon
-       my ($lei) = @_;
+       my ($lei, $wq_fld) = @_;
        local $current_lei = $lei;
-       delete $lei->{wq1} // return $lei->fail; # already failed
+       my $wq = delete $lei->{$wq_fld // 'wq1'};
+       $lei->sto_done_request($wq);
+       $wq // $lei->fail; # already failed
 }
 
 sub watch_state_ok ($) {
@@ -1511,15 +1517,14 @@ sub lms {
 }
 
 sub sto_done_request {
-       my ($lei, $sock) = @_;
+       my ($lei, $wq) = @_;
+       return unless $lei->{sto};
        local $current_lei = $lei;
-       eval {
-               if ($sock //= $lei->{sock}) { # issue, async wait
-                       $lei->{sto}->wq_io_do('done', [ $sock ]);
-               } else { # forcibly wait
-                       my $wait = $lei->{sto}->wq_do('done');
-               }
-       };
+       my $sock = $wq ? $wq->{lei_sock} : undef;
+       $sock //= $lei->{sock};
+       my @io;
+       push(@io, $sock) if $sock; # async wait iff possible
+       eval { $lei->{sto}->wq_io_do('done', \@io) };
        warn($@) if $@;
 }