X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FIPC.pm;h=e6a1082c71ba02470b98a551bd737bb1b0ad3806;hb=4e54b398ad511e5177ae2cc2243eba9408f840a8;hp=b0a0bfb5460f67962ea09ce5824c8916a34a3a06;hpb=6cc0e6870cb4950c08646769f2a7e30729b7d409;p=public-inbox.git diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index b0a0bfb5..e6a1082c 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -12,6 +12,7 @@ use POSIX qw(WNOHANG); use Socket qw(AF_UNIX MSG_EOR); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); +my $WQ_MAX_WORKERS = 4096; my ($enc, $dec); # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ # and eliminate method call overhead @@ -36,17 +37,39 @@ if ($enc && $dec) { # should be custom ops } // warn("Storable (part of Perl) missing: $@\n"); } +my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1; my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do { require PublicInbox::CmdIPC4; $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4'); PublicInbox::CmdIPC4->can('send_cmd4'); } // do { + # IO::FDPass only allows sending a single FD at-a-time, which + # means we can't guarantee all packets end up on the same worker, + # so we cap WQ_MAX_WORKERS require PublicInbox::CmdIPC1; - $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); + $recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1'); + $WQ_MAX_WORKERS = 1 if $recv_cmd1; + wq_set_recv_fds(3); PublicInbox::CmdIPC1->can('send_cmd1'); }; +# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv +sub wq_set_recv_fds { + return unless $recv_cmd1; + my $nfds = pop; + my $sub = sub { + my ($sock, $fds, undef, $flags) = @_; + $recv_cmd1->($sock, $fds, $_[2], $flags, $nfds); + }; + my $self = pop; + if (ref $self) { + $self->{-wq_recv_cmd} = $sub; + } else { + $recv_cmd = $sub; + } +} + sub _get_rec ($) { my ($r) = @_; defined(my $len = <$r>) or return; @@ -237,8 +260,9 @@ sub wq_worker_loop ($) { local $SIG{PIPE} = sub { die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub; }; + my $rcv = $self->{-wq_recv_cmd} // $recv_cmd; until ($self->{-wq_quit}) { - my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF + my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF my $i = 0; my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; for my $fd (@fds) { @@ -305,6 +329,7 @@ sub wq_workers_start { socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; $self->ipc_atfork_parent; $nr_workers //= 4; + $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS; my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->{-wq_workers} = {}; $self->{-wq_ident} = $ident; @@ -318,6 +343,7 @@ sub wq_workers_start { sub wq_worker_incr { # SIGTTIN handler my ($self, $oldset) = @_; $self->{-wq_s2} or return; + return if wq_workers($self) >= $WQ_MAX_WORKERS; $self->ipc_atfork_parent; my $sigset = $oldset // PublicInbox::DS::block_signals(); _wq_worker_start($self, $sigset); @@ -331,7 +357,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait sub wq_worker_decr { # SIGTTOU handler, kills first idle worker my ($self) = @_; - my $workers = $self->{-wq_workers} or return; + return unless wq_workers($self); my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2'; $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]); $self->{-wq_exit_pending}++; @@ -377,6 +403,8 @@ sub wq_close { } } +sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS } + sub DESTROY { wq_close($_[0]); ipc_worker_stop($_[0]);