]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
lei: generalize auxiliary WQ handling
[public-inbox.git] / lib / PublicInbox / LEI.pm
index 3527cf09afb2a7932e4b0030a1624a1b70ad27da..ed01e8def7bf2754fd5130a04d900e111079a602 100644 (file)
@@ -218,9 +218,6 @@ our %CMD = ( # sorted in order of importance/use:
        qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@),
        qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt,
        pass_through('-kw:foo for delete') ],
-'forget' => [ '[--stdin|--oid=OID|--by-mid=MID]',
-       "exclude message(s) on stdin from `q' search results",
-       qw(stdin| oid=s exact by-mid|mid:s), @c_opt ],
 
 'purge-mailsource' => [ 'LOCATION|--all',
        'remove imported messages from IMAP, Maildirs, and MH',
@@ -424,7 +421,7 @@ my %CONFIG_KEYS = (
        'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
 
 sub _drop_wq {
        my ($self) = @_;
@@ -444,8 +441,10 @@ sub x_it ($$) {
        # make sure client sees stdout before exit
        $self->{1}->autoflush(1) if $self->{1};
        dump_and_clear_log();
-       if (my $s = $self->{pkt_op_p} // $self->{sock}) {
-               send($s, "x_it $code", MSG_EOR);
+       if ($self->{pkt_op_p}) { # to top lei-daemon
+               $self->{pkt_op_p}->pkt_do('x_it', $code);
+       } elsif ($self->{sock}) { # to lei(1) client
+               send($self->{sock}, "x_it $code", MSG_EOR);
        } # else ignore if client disconnected
 }
 
@@ -481,9 +480,10 @@ sub sigint_reap {
 
 sub fail ($$;$) {
        my ($self, $buf, $exit_code) = @_;
+       $self->{failed}++;
        err($self, $buf) if defined $buf;
        # calls fail_handler:
-       send($self->{pkt_op_p}, '!', MSG_EOR) if $self->{pkt_op_p};
+       $self->{pkt_op_p}->pkt_do('!') if $self->{pkt_op_p};
        x_it($self, ($exit_code // 1) << 8);
        undef;
 }
@@ -502,18 +502,17 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) }
 sub child_error { # passes non-fatal curl exit codes to user
        my ($self, $child_error, $msg) = @_; # child_error is $?
        $self->err($msg) if $msg;
-       if (my $s = $self->{pkt_op_p} // $self->{sock}) {
-               # send to the parent lei-daemon or to lei(1) client
-               send($s, "child_error $child_error", MSG_EOR);
-       } elsif (!$PublicInbox::DS::in_loop) {
-               $self->{child_error} = $child_error;
+       if ($self->{pkt_op_p}) { # to top lei-daemon
+               $self->{pkt_op_p}->pkt_do('child_error', $child_error);
+       } elsif ($self->{sock}) { # to lei(1) client
+               send($self->{sock}, "child_error $child_error", MSG_EOR);
        } # else noop if client disconnected
 }
 
 sub note_sigpipe { # triggers sigpipe_handler
        my ($self, $fd) = @_;
        close(delete($self->{$fd})); # explicit close silences Perl warning
-       send($self->{pkt_op_p}, '|', MSG_EOR) if $self->{pkt_op_p};
+       $self->{pkt_op_p}->pkt_do('|') if $self->{pkt_op_p};
        x_it($self, 13);
 }
 
@@ -553,8 +552,8 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
        if (my $op = delete $self->{pkt_op_c}) { # in case of die
                $op->close; # PublicInbox::PktOp::close
        }
-       my $unclosed_after_die = delete($self->{pkt_op_p}) or return;
-       close $unclosed_after_die;
+       my $pkt_op_p = delete($self->{pkt_op_p}) or return;
+       close $pkt_op_p->{op_p};
 }
 
 sub pkt_op_pair {
@@ -581,11 +580,22 @@ sub workers_start {
        $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
        delete $lei->{pkt_op_p};
        my $op_c = delete $lei->{pkt_op_c};
+       # {-lei_sock} persists script/lei process until ops->{''} EOF callback
+       $op_c->{-lei_sock} = $lei->{sock};
        @$end = ();
        $lei->event_step_init;
        ($op_c, $ops);
 }
 
+# call this when we're ready to wait on events and yield to other clients
+sub wait_wq_events {
+       my ($lei, $op_c, $ops) = @_;
+       for my $wq (grep(defined, @$lei{qw(ikw)})) { # auxiliary WQs
+               $wq->wq_close(1);
+       }
+       $op_c->{ops} = $ops;
+}
+
 sub _help {
        require PublicInbox::LeiHelp;
        PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
@@ -1045,7 +1055,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 sub dclose {
        my ($self) = @_;
        delete $self->{-progress};
-       _drop_wq($self);
+       _drop_wq($self) if $self->{failed};
        close(delete $self->{1}) if $self->{1}; # may reap_compress
        $self->close if $self->{-event_init_done}; # PublicInbox::DS::close
 }