]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/IPC.pm
lei q: parallelize Maildir and mbox writing
[public-inbox.git] / lib / PublicInbox / IPC.pm
index 88f81e477ec07d58c97f80f8ccb091d88f7f6d2d..8fec2e62a4373f4b81507403ba64245e623808b5 100644 (file)
@@ -37,37 +37,16 @@ if ($enc && $dec) { # should be custom ops
        } // warn("Storable (part of Perl) missing: $@\n");
 }
 
-my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1;
 my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
 my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
        require PublicInbox::CmdIPC4;
        $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
        PublicInbox::CmdIPC4->can('send_cmd4');
-} // do {
-       # IO::FDPass only allows sending a single FD at-a-time, which
-       # means we can't guarantee all packets end up on the same worker,
-       # so we cap WQ_MAX_WORKERS
-       require PublicInbox::CmdIPC1;
-       $recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1');
-       $WQ_MAX_WORKERS = 1 if $recv_cmd1;
-       wq_set_recv_fds(3);
-       PublicInbox::CmdIPC1->can('send_cmd1');
 };
 
-# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv
-sub wq_set_recv_fds {
-       return unless $recv_cmd1;
-       my $nfds = pop;
-       my $sub = sub {
-               my ($sock, $fds, undef, $flags) = @_;
-               $recv_cmd1->($sock, $fds, $_[2], $flags, $nfds);
-       };
-       my $self = pop;
-       if (ref $self) {
-               $self->{-wq_recv_cmd} = $sub;
-       } else {
-               $recv_cmd = $sub;
-       }
+sub wq_set_recv_modes {
+       my ($self, @modes) = @_;
+       $self->{-wq_recv_modes} = \@modes;
 }
 
 sub _get_rec ($) {
@@ -125,11 +104,11 @@ sub ipc_worker_spawn {
        pipe(my ($r_req, $w_req)) or die "pipe: $!";
        pipe(my ($r_res, $w_res)) or die "pipe: $!";
        my $sigset = $oldset // PublicInbox::DS::block_signals();
-       my $parent = $$;
        $self->ipc_atfork_prepare;
        defined(my $pid = fork) or die "fork: $!";
        if ($pid == 0) {
                eval { PublicInbox::DS->Reset };
+               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                $w_req = $r_res = undef;
                $w_res->autoflush(1);
                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
@@ -151,7 +130,8 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
        my ($self, $pid) = @_;
-       warn "PID:$pid died with \$?=$?\n" if $?;
+       # SIGTERM (15) is our default exit signal
+       warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
 }
 
 # for base class, override in sub classes
@@ -257,48 +237,31 @@ sub ipc_sibling_atfork_child {
        $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _close_recvd ($) {
-       my ($self) = @_;
-       close($_) for (grep { defined } (delete @$self{0..2}));
-}
-
 sub wq_worker_loop ($) {
        my ($self) = @_;
-       my $buf;
        my $len = $self->{wq_req_len} // (4096 * 33);
-       my ($sub, $args);
        my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-       local $SIG{PIPE} = sub {
-               my $cur_sub = $sub;
-               _close_recvd($self);
-               die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub;
-       };
-       my $rcv = $self->{-wq_recv_cmd} // $recv_cmd;
        while (1) {
-               my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF
-               my $i = 0;
-               my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
+               my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
+               my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
+               my $nfd = 0;
                for my $fd (@fds) {
                        my $mode = shift(@m);
                        if (open(my $cmdfh, $mode, $fd)) {
-                               $self->{$i++} = $cmdfh;
+                               $self->{$nfd++} = $cmdfh;
                                $cmdfh->autoflush(1);
                        } else {
-                               die "$$ open($mode$fd) (FD:$i): $!";
+                               die "$$ open($mode$fd) (FD:$nfd): $!";
                        }
                }
                # Sereal dies on truncated data, Storable returns undef
-               $args = thaw($buf) //
+               my $args = thaw($buf) //
                        die "thaw error on buffer of size:".length($buf);
-               eval {
-                       $sub = shift @$args;
-                       eval { $self->$sub(@$args) };
-                       undef $sub; # quiet SIG{PIPE} handler
-                       die $@ if $@;
-               };
-               warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
-               # need to close explicitly to avoid warnings after SIGPIPE
-               _close_recvd($self);
+               my $sub = shift @$args;
+               eval { $self->$sub(@$args) };
+               warn "$$ wq_worker: $@" if $@ &&
+                                       ref($@) ne 'PublicInbox::SIGPIPE';
+               delete @$self{0..($nfd-1)};
        }
 }
 
@@ -310,8 +273,8 @@ sub wq_do { # always async
        } else {
                @$self{0..$#$ios} = @$ios;
                eval { $self->$sub(@args) };
-               warn "wq_do: $@" if $@;
-               delete @$self{0..$#$ios};
+               warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+               delete @$self{0..$#$ios}; # don't close
        }
 }
 
@@ -320,8 +283,7 @@ sub _wq_worker_start ($$) {
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
                eval { PublicInbox::DS->Reset };
-               close(delete $self->{-wq_s1});
-               delete $self->{qw(-wq_workers -wq_ppid)};
+               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
                $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
                local $0 = $self->{-wq_ident};
@@ -343,16 +305,15 @@ sub wq_workers_start {
        my ($self, $ident, $nr_workers, $oldset) = @_;
        ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
        return if $self->{-wq_s1}; # idempotent
-       my ($s1, $s2);
-       socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+       $self->{-wq_s1} = $self->{-wq_s2} = undef;
+       socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
+               die "socketpair: $!";
        $self->ipc_atfork_prepare;
        $nr_workers //= 4;
        $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
        my $sigset = $oldset // PublicInbox::DS::block_signals();
        $self->{-wq_workers} = {};
        $self->{-wq_ident} = $ident;
-       $self->{-wq_s1} = $s1;
-       $self->{-wq_s2} = $s2;
        _wq_worker_start($self, $sigset) for (1..$nr_workers);
        PublicInbox::DS::sig_setmask($sigset) unless $oldset;
        $self->{-wq_ppid} = $$;
@@ -414,16 +375,29 @@ sub wq_close {
        my $ppid = delete $self->{-wq_ppid} or return;
        my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
        return if $ppid != $$; # can't reap siblings or parents
+       return (keys %$workers) if wantarray; # caller will reap
        for my $pid (keys %$workers) {
                dwaitpid($pid, \&ipc_worker_reap, $self);
        }
 }
 
+sub wq_kill {
+       my ($self, $sig) = @_;
+       my $workers = $self->{-wq_workers} or return;
+       kill($sig // 'TERM', keys %$workers);
+}
+
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
-       wq_close($_[0]);
-       ipc_worker_stop($_[0]);
+       my ($self) = @_;
+       my $ppid = $self->{-wq_ppid};
+       wq_kill($self) if $ppid && $ppid == $$;
+       wq_close($self);
+       ipc_worker_stop($self);
 }
 
+# Sereal doesn't have dclone
+sub deep_clone { thaw(freeze($_[-1])) }
+
 1;