]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei q: delay worker spawn
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index 57a18075b28420536869c1c59d80be107c058ec0..ab66717c04e7b8142b70b386235011468aeb7780 100644 (file)
@@ -113,8 +113,7 @@ sub mset_progress {
        if ($lei->{pkt_op}) { # called via pkt_op/pkt_do from workers
                pkt_do($lei->{pkt_op}, 'mset_progress', @_);
        } else { # single lei-daemon consumer
-               my @args = ref($_[-1]) eq 'ARRAY' ? @{$_[-1]} : @_;
-               my ($desc, $mset_size, $mset_total_est) = @args;
+               my ($desc, $mset_size, $mset_total_est) = @_;
                $lei->{-mset_total} += $mset_size;
                $lei->err("# $desc $mset_size/$mset_total_est");
        }
@@ -264,14 +263,11 @@ sub query_remote_mboxrd {
                shift(@$cmd) if !$cmd->[0];
 
                $lei->err("# @$cmd") if $verbose;
-               $? = 0;
-               my $fh = popen_rd($cmd, $env, $rdr);
+               my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
                $fh = IO::Uncompress::Gunzip->new($fh);
-               eval {
-                       PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
-                                                       $lei, $each_smsg);
-               };
-               return $lei->fail("E: @$cmd: $@") if $@;
+               PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
+                                               $lei, $each_smsg);
+               waitpid($pid, 0) == $pid or die "BUG: waitpid (curl): $!";
                if ($? == 0) {
                        my $nr = $lei->{-nr_remote_eml};
                        mset_progress($lei, $lei->{-current_url}, $nr, $nr);
@@ -350,7 +346,6 @@ sub do_post_augment {
 }
 
 my $MAX_PER_HOST = 4;
-sub MAX_PER_HOST { $MAX_PER_HOST }
 
 sub concurrency {
        my ($self, $opt) = @_;
@@ -407,9 +402,17 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 sub do_query {
        my ($self, $lei) = @_;
        $lei->{1}->autoflush(1);
+       $lei->start_pager if -t $lei->{1};
+       $lei->{ovv}->ovv_begin($lei);
        my ($au_done, $zpipe);
        my $l2m = $lei->{l2m};
+       $lei->atfork_prepare_wq($self);
+       $self->wq_workers_start('lei_xsearch', $self->{jobs}, $lei->oldset);
+       delete $self->{-ipc_atfork_child_close};
        if ($l2m) {
+               $lei->atfork_prepare_wq($l2m);
+               $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, $lei->oldset);
+               delete $l2m->{-ipc_atfork_child_close};
                pipe($lei->{startq}, $au_done) or die "pipe: $!";
                # 1031: F_SETPIPE_SZ
                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
@@ -421,8 +424,10 @@ sub do_query {
                '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
                '' => [ \&query_done, $lei ],
                'mset_progress' => [ \&mset_progress, $lei ],
+               'x_it' => [ $lei->can('x_it'), $lei ],
+               'child_error' => [ $lei->can('child_error'), $lei ],
        };
-       (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, !$lei->{oneshot});
+       (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops);
        my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
        delete($lei->{pkt_op});