]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/IPC.pm
lei q: parallelize Maildir and mbox writing
[public-inbox.git] / lib / PublicInbox / IPC.pm
index fbc91f6f96ea013252721c2c3127f70a6213d979..8fec2e62a4373f4b81507403ba64245e623808b5 100644 (file)
@@ -104,11 +104,11 @@ sub ipc_worker_spawn {
        pipe(my ($r_req, $w_req)) or die "pipe: $!";
        pipe(my ($r_res, $w_res)) or die "pipe: $!";
        my $sigset = $oldset // PublicInbox::DS::block_signals();
-       my $parent = $$;
        $self->ipc_atfork_prepare;
        defined(my $pid = fork) or die "fork: $!";
        if ($pid == 0) {
                eval { PublicInbox::DS->Reset };
+               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                $w_req = $r_res = undef;
                $w_res->autoflush(1);
                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
@@ -283,8 +283,7 @@ sub _wq_worker_start ($$) {
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
                eval { PublicInbox::DS->Reset };
-               close(delete $self->{-wq_s1});
-               delete $self->{qw(-wq_workers -wq_ppid)};
+               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
                $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
                local $0 = $self->{-wq_ident};
@@ -306,16 +305,15 @@ sub wq_workers_start {
        my ($self, $ident, $nr_workers, $oldset) = @_;
        ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
        return if $self->{-wq_s1}; # idempotent
-       my ($s1, $s2);
-       socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+       $self->{-wq_s1} = $self->{-wq_s2} = undef;
+       socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
+               die "socketpair: $!";
        $self->ipc_atfork_prepare;
        $nr_workers //= 4;
        $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
        my $sigset = $oldset // PublicInbox::DS::block_signals();
        $self->{-wq_workers} = {};
        $self->{-wq_ident} = $ident;
-       $self->{-wq_s1} = $s1;
-       $self->{-wq_s2} = $s2;
        _wq_worker_start($self, $sigset) for (1..$nr_workers);
        PublicInbox::DS::sig_setmask($sigset) unless $oldset;
        $self->{-wq_ppid} = $$;
@@ -377,6 +375,7 @@ sub wq_close {
        my $ppid = delete $self->{-wq_ppid} or return;
        my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
        return if $ppid != $$; # can't reap siblings or parents
+       return (keys %$workers) if wantarray; # caller will reap
        for my $pid (keys %$workers) {
                dwaitpid($pid, \&ipc_worker_reap, $self);
        }
@@ -391,9 +390,14 @@ sub wq_kill {
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
-       wq_kill($_[0]);
-       wq_close($_[0]);
-       ipc_worker_stop($_[0]);
+       my ($self) = @_;
+       my $ppid = $self->{-wq_ppid};
+       wq_kill($self) if $ppid && $ppid == $$;
+       wq_close($self);
+       ipc_worker_stop($self);
 }
 
+# Sereal doesn't have dclone
+sub deep_clone { thaw(freeze($_[-1])) }
+
 1;