]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ipc: wq: handle >MAX_ARG_STRLEN && <EMSGSIZE case
authorEric Wong <e@80x24.org>
Tue, 25 May 2021 22:20:01 +0000 (22:20 +0000)
committerEric Wong <e@80x24.org>
Tue, 25 May 2021 23:05:02 +0000 (23:05 +0000)
WQWorkers are limited roughly to MAX_ARG_STRLEN (the kernel
limit of argv + environ) to avoid excessive memory growth.
Occasionally, we need to send larger messages via workqueues
that are too small to hit EMSGSIZE on the sender.

This fixes "lei q" when using HTTP(S) externals, since that
code path sends large Eml objects from lei_xsearch workers
directly to lei2mail WQ workers.

lib/PublicInbox/IPC.pm
lib/PublicInbox/WQWorker.pm
t/ipc.t

index 24237773903373ca63f3b0783da537b678a841b8..497a6035aa747704f189f2c740d91cd26174c5fa 100644 (file)
@@ -17,6 +17,7 @@ use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
+my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
 my $WQ_MAX_WORKERS = 4096;
@@ -213,7 +214,7 @@ sub ipc_sibling_atfork_child {
 
 sub recv_and_run {
        my ($self, $s2, $len, $full_stream) = @_;
-       my @fds = $recv_cmd->($s2, my $buf, $len);
+       my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
        return if scalar(@fds) && !defined($fds[0]);
        my $n = length($buf) or return 0;
        my $nfd = 0;
@@ -268,27 +269,37 @@ sub wq_broadcast {
        }
 }
 
+sub stream_in_full ($$$) {
+       my ($s1, $fds, $buf) = @_;
+       socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
+               croak "socketpair: $!";
+       my $n = $send_cmd->($s1, [ fileno($r) ],
+                       ipc_freeze(['do_sock_stream', length($buf)]),
+                       MSG_EOR) // croak "sendmsg: $!";
+       undef $r;
+       $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
+       while ($n < length($buf)) {
+               my $x = syswrite($w, $buf, length($buf) - $n, $n) //
+                               croak "syswrite: $!";
+               croak "syswrite wrote 0 bytes" if $x == 0;
+               $n += $x;
+       }
+}
+
 sub wq_io_do { # always async
        my ($self, $sub, $ios, @args) = @_;
        if (my $s1 = $self->{-wq_s1}) { # run in worker
                my $fds = [ map { fileno($_) } @$ios ];
                my $buf = ipc_freeze([$sub, @args]);
-               my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
-               return if defined($n); # likely
-               croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
-               croak "sendmsg: $!" if !$!{EMSGSIZE};
-               socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
-                       croak "socketpair: $!";
-               $n = $send_cmd->($s1, [ fileno($r) ],
-                               ipc_freeze(['do_sock_stream', length($buf)]),
-                               MSG_EOR) // croak "sendmsg: $!";
-               undef $r;
-               $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
-               while ($n < length($buf)) {
-                       my $x = syswrite($w, $buf, length($buf) - $n, $n) //
-                                       croak "syswrite: $!";
-                       croak "syswrite wrote 0 bytes" if $x == 0;
-                       $n += $x;
+               if (length($buf) > $MY_MAX_ARG_STRLEN) {
+                       stream_in_full($s1, $fds, $buf);
+               } else {
+                       my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
+                       return if defined($n); # likely
+                       $!{ETOOMANYREFS} and
+                               croak "sendmsg: $! (check RLIMIT_NOFILE)";
+                       $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+                       croak("sendmsg: $!");
                }
        } else {
                @$self{0..$#$ios} = @$ios;
index 3636321e901eab5b365922331ee819767d8777b8..f7aa61c555705d2c0c476f8526844ea28830b134 100644 (file)
@@ -23,7 +23,7 @@ sub event_step {
        my ($self) = @_;
        my $n;
        do {
-               $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33);
+               $n = $self->{wq}->recv_and_run($self->{sock});
        } while ($n);
        return if !defined($n) && $! == EAGAIN; # likely
        warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;
diff --git a/t/ipc.t b/t/ipc.t
index ca88eb59d4ed8fe4dc16fcd2d24ea7c973e060c6..7983fdc02ef1bcffcd479e860b2c92dd0c7c8dba 100644 (file)
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -122,11 +122,16 @@ for my $t ('local', 'worker', 'worker again') {
        $ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world');
        is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)");
        {
-               my $bigger = $big x 10;
+               my $bigger = $big x 10; # to hit EMSGSIZE
                $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger);
                my $exp = sha1_hex($bigger)."\n";
-               undef $bigger;
-               is(readline($rb), $exp, "SHA big ($t)");
+               is(readline($rb), $exp, "SHA big for EMSGSIZE ($t)");
+
+               # to hit the WQWorker recv_and_run length
+               substr($bigger, my $MY_MAX_ARG_STRLEN = 4096 * 33, -1) = '';
+               $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger);
+               $exp = sha1_hex($bigger)."\n";
+               is(readline($rb), $exp, "SHA WQWorker limit ($t)");
        }
        my $ppid = $ipc->wq_workers_start('wq', 1);
        push(@ppids, $ppid);