]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
lei q: emit progress and counting via PktOp
[public-inbox.git] / lib / PublicInbox / LEI.pm
index 08554932ab8b06957c81380feb9aee1ad6c18192..6c2515dc0eb74cf37530445e27acb44134d44702 100644 (file)
@@ -104,7 +104,7 @@ our %CMD = ( # sorted in order of importance/use:
 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
        save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
        sort|s=s reverse|r offset=i remote! local! external! pretty
-       mua-cmd|mua=s no-torsocks torsocks=s verbose|v
+       mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q
        received-after=s received-before=s sent-after=s sent-since=s),
        PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ],
 
@@ -305,7 +305,8 @@ sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
 
 sub fail ($$;$) {
        my ($self, $buf, $exit_code) = @_;
-       err($self, $buf);
+       err($self, $buf) if defined $buf;
+       send($self->{pkt_op}, '!', MSG_EOR) if $self->{pkt_op}; # fail_handler
        x_it($self, ($exit_code // 1) << 8);
        undef;
 }
@@ -365,18 +366,17 @@ sub io_restore ($$) {
        }
 }
 
-# triggers sigpipe_handler
-sub note_sigpipe {
+sub note_sigpipe { # triggers sigpipe_handler
        my ($self, $fd) = @_;
        close(delete($self->{$fd})); # explicit close silences Perl warning
-       syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
+       send($self->{pkt_op}, '|', MSG_EOR) if $self->{pkt_op};
        x_it($self, 13);
 }
 
 sub atfork_child_wq {
        my ($self, $wq) = @_;
        io_restore($self, $wq);
-       -p $self->{op_pipe} or die 'BUG: {op_pipe} expected';
+       -S $self->{pkt_op} or die 'BUG: {pkt_op} expected';
        io_restore($self->{l2m}, $wq);
        %PATH2CFG = ();
        undef $errors_log;
@@ -408,7 +408,7 @@ sub atfork_parent_wq {
        $self->{env} = $env;
        delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
        my @io = (delete(@$lei{qw(0 1 2)}),
-                       io_extract($lei, qw(sock op_pipe startq)));
+                       io_extract($lei, qw(sock pkt_op startq)));
        my $l2m = $lei->{l2m};
        if ($l2m && $l2m != $wq) { # $wq == lxs
                if (my $wq_s1 = $l2m->{-wq_s1}) {
@@ -824,7 +824,7 @@ sub accept_dispatch { # Listener {post_accept} callback
        $sock->autoflush(1);
        my $self = bless { sock => $sock }, __PACKAGE__;
        vec(my $rvec = '', fileno($sock), 1) = 1;
-       select($rvec, undef, undef, 1) or
+       select($rvec, undef, undef, 60) or
                return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
        my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
        if (scalar(@fds) == 4) {
@@ -834,7 +834,9 @@ sub accept_dispatch { # Listener {post_accept} callback
                        send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR);
                }
        } else {
-               return send($sock, "recv_cmd failed: $!", MSG_EOR);
+               my $msg = "recv_cmd failed: $!";
+               warn $msg;
+               return send($sock, $msg, MSG_EOR);
        }
        $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
        # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
@@ -898,7 +900,6 @@ sub event_step {
 sub event_step_init {
        my ($self) = @_;
        if (my $sock = $self->{sock}) { # using DS->EventLoop
-               $sock->blocking(0);
                $self->SUPER::new($sock, EPOLLIN|EPOLLET);
        }
 }