]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/IPC.pm
ipc: switch wq to use the event loop
[public-inbox.git] / lib / PublicInbox / IPC.pm
index ece0e8b809798b5571711624875a746b2b452fc9..479c4377b3e07ba12aa8bada31523ca9b4efdcd9 100644 (file)
@@ -14,6 +14,7 @@ use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
+use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 use Errno qw(EMSGSIZE);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
@@ -119,13 +120,12 @@ sub ipc_worker_spawn {
                PublicInbox::DS::sig_setmask($sigset);
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
-               my $on_destroy = $self->ipc_atfork_child;
                eval {
+                       my $on_destroy = $self->ipc_atfork_child;
                        local %SIG = %SIG;
                        ipc_worker_loop($self, $r_req, $w_res);
                };
                die "worker $ident PID:$$ died: $@\n" if $@;
-               undef $on_destroy;
                undef $end; # trigger exit
        }
        PublicInbox::DS::sig_setmask($sigset) unless $oldset;
@@ -152,6 +152,8 @@ sub wq_wait_old {
 # for base class, override in sub classes
 sub ipc_atfork_prepare {}
 
+sub wq_atexit_child {}
+
 sub ipc_atfork_child {
        my ($self) = @_;
        my $io = delete($self->{-ipc_atfork_child_close}) or return;
@@ -252,10 +254,11 @@ sub ipc_sibling_atfork_child {
        $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _recv_and_run {
+sub recv_and_run {
        my ($self, $s2, $len, $full_stream) = @_;
        my @fds = $recv_cmd->($s2, my $buf, $len);
-       my $n = length($buf // '') or return;
+       return if scalar(@fds) && !defined($fds[0]);
+       my $n = length($buf) or return 0;
        my $nfd = 0;
        for my $fd (@fds) {
                if (open(my $cmdfh, '+<&=', $fd)) {
@@ -282,14 +285,15 @@ sub _recv_and_run {
 
 sub wq_worker_loop ($) {
        my ($self) = @_;
-       my $len = $self->{wq_req_len} // (4096 * 33);
-       my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-       1 while (_recv_and_run($self, $s2, $len));
+       my $wqw = PublicInbox::WQWorker->new($self);
+       PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+       PublicInbox::DS->EventLoop;
+       PublicInbox::DS->Reset;
 }
 
 sub do_sock_stream { # via wq_do, for big requests
        my ($self, $len) = @_;
-       _recv_and_run($self, delete $self->{0}, $len, 1);
+       recv_and_run($self, delete $self->{0}, $len, 1);
 }
 
 sub wq_do { # always async
@@ -336,13 +340,12 @@ sub _wq_worker_start ($$$) {
                PublicInbox::DS::sig_setmask($oldset);
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
-               my $on_destroy = $self->ipc_atfork_child;
                eval {
+                       my $on_destroy = $self->ipc_atfork_child;
                        local %SIG = %SIG;
                        wq_worker_loop($self);
                };
                warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
-               undef $on_destroy;
                undef $end; # trigger exit
        } else {
                $self->{-wq_workers}->{$pid} = \undef;