]> Sergey Matveev's repositories - public-inbox.git/commitdiff
processpipe: allow synchronous close to set $?
authorEric Wong <e@80x24.org>
Sat, 2 Jan 2021 09:13:39 +0000 (19:13 -1400)
committerEric Wong <e@80x24.org>
Sat, 2 Jan 2021 22:38:49 +0000 (22:38 +0000)
To get rid of the ugly $PublicInbox::DS::in_loop localization
in MboxReader, we'll distinguish between ->CLOSE and ->DESTROY
with ProcessPipe.

If we end up closing via ->DESTROY, we'll assume the caller will
want to deal with $? asynchronously via the event loop (or not
even care about $?).

If we hit ->CLOSE directly, we'll assume the caller called
close() and wants to check $? synchronously.

Note: wantarray doesn't seem to propagate into tied methods,
otherwise I'd be relying on that.

lib/PublicInbox/MboxReader.pm
lib/PublicInbox/ProcessPipe.pm
lib/PublicInbox/Spawn.pm
t/mbox_reader.t
t/spawn.t

index 1894756de3841cda2d059221cdadd9c741cab5a5..59ce4fb624d57000fcf6a913c9072e927486f483 100644 (file)
@@ -5,7 +5,6 @@
 package PublicInbox::MboxReader;
 use strict;
 use v5.10.1;
-use PublicInbox::DS (); # localize $in_loop for error detection :<
 use Data::Dumper;
 $Data::Dumper::Useqq = 1; # should've been the default, for bad data
 
@@ -14,7 +13,6 @@ my $from_strict =
 
 sub _mbox_from {
        my ($mbfh, $from_re, $eml_cb, @arg) = @_;
-       local $PublicInbox::DS::in_loop; # disable dwaitpid
        my $buf = '';
        my @raw;
        while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) {
@@ -75,7 +73,6 @@ sub _extract_hdr {
 
 sub _mbox_cl ($$$;@) {
        my ($mbfh, $uxs_from, $eml_cb, @arg) = @_;
-       local $PublicInbox::DS::in_loop; # disable dwaitpid
        my $buf = '';
        while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) {
                if ($r == 0) { # detect "curl --fail"
index 336d5ac4fa97d18a1b34304486138fe6a5ecea3a..400a22f3bf65a0a6eda398cee101d34c9fcb373a 100644 (file)
@@ -6,10 +6,12 @@ package PublicInbox::ProcessPipe;
 use strict;
 use v5.10.1;
 use PublicInbox::DS qw(dwaitpid);
+use Carp qw(carp);
 
 sub TIEHANDLE {
        my ($class, $pid, $fh, $cb, $arg) = @_;
-       bless { pid => $pid, fh => $fh, cb => $cb, arg => $arg }, $class;
+       bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg },
+               $class;
 }
 
 sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) }
@@ -26,32 +28,41 @@ sub PRINT {
        print { $self->{fh} } @_;
 }
 
-sub adjust_ret { # dwaitpid callback
-       my ($retref, $pid) = @_;
-       $$retref = '' if $?
-}
+sub FILENO { fileno($_[0]->{fh}) }
 
-sub CLOSE {
-       my $fh = delete($_[0]->{fh});
-       my $ret = defined $fh ? close($fh) : '';
-       my ($pid, $cb, $arg) = delete @{$_[0]}{qw(pid cb arg)};
-       if (defined $pid) {
-               unless ($cb) {
-                       $cb = \&adjust_ret;
-                       $arg = \$ret;
+sub _close ($;$) {
+       my ($self, $wait) = @_;
+       my $fh = delete $self->{fh};
+       my $ret = defined($fh) ? close($fh) : '';
+       my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)};
+       return $ret unless defined($pid) && $self->{ppid} == $$;
+       if ($wait) { # caller cares about the exit status:
+               my $wp = waitpid($pid, 0);
+               if ($wp == $pid) {
+                       $ret = '' if $?;
+                       if ($cb) {
+                               eval { $cb->($arg, $pid) };
+                               carp "E: cb(arg, $pid): $@" if $@;
+                       }
+               } else {
+                       carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?";
                }
+       } else { # caller just undef-ed it, let event loop deal with it
                dwaitpid $pid, $cb, $arg;
        }
        $ret;
 }
 
-sub FILENO { fileno($_[0]->{fh}) }
+# if caller uses close(), assume they want to check $? immediately so
+# we'll waitpid() synchronously.  n.b. wantarray doesn't seem to
+# propagate `undef' down to tied methods, otherwise I'd rely on that.
+sub CLOSE { _close($_[0], 1) }
 
+# if relying on DESTROY, assume the caller doesn't care about $? and
+# we can let the event loop call waitpid() whenever it gets SIGCHLD
 sub DESTROY {
-       CLOSE(@_);
+       _close($_[0]);
        undef;
 }
 
-sub pid { $_[0]->{pid} }
-
 1;
index 1ee40503c33f0b014bb6814295d890e3335133e2..762a0549bba4e95b5d62693767ab499117ecde93 100644 (file)
@@ -295,14 +295,14 @@ sub spawn ($;$$) {
 }
 
 sub popen_rd {
-       my ($cmd, $env, $opts) = @_;
+       my ($cmd, $env, $opt) = @_;
        pipe(my ($r, $w)) or die "pipe: $!\n";
-       $opts ||= {};
-       $opts->{1} = fileno($w);
-       my $pid = spawn($cmd, $env, $opts);
+       $opt ||= {};
+       $opt->{1} = fileno($w);
+       my $pid = spawn($cmd, $env, $opt);
        return ($r, $pid) if wantarray;
        my $ret = gensym;
-       tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r;
+       tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
        $ret;
 }
 
index 53458ec2fb2095763c5f2c0e6301d3d16eb037b4..4ea2ae293a8ec8e4381bf9fd0ba91d8715c66644 100644 (file)
@@ -72,4 +72,21 @@ for my $fmt (@mbox) { $check_fmt->($fmt) }
 s/\n/\r\n/sg for (values %raw);
 for my $fmt (@mbox) { $check_fmt->($fmt) }
 
+SKIP: {
+       use PublicInbox::Spawn qw(popen_rd);
+       use Time::HiRes qw(alarm);
+       my $fh = popen_rd([ $^X, '-E', <<'' ]);
+say "From x@y Fri Oct  2 00:00:00 1993";
+print "a: b\n\n", "x" x 70000, "\n\n";
+say "From x@y Fri Oct  2 00:00:00 2010";
+print "Final: bit\n\n", "Incomplete\n\n";
+exit 1
+
+       my @x;
+       eval { $reader->mboxrd($fh, sub { push @x, shift->as_string }) };
+       like($@, qr/error closing mbox/, 'detects error reading from pipe');
+       is(scalar(@x), 1, 'only saw one message');
+       is(scalar(grep(/Final/, @x)), 0, 'no incomplete bit');
+}
+
 done_testing;
index 552bba3302e8c969610b9759e6addaa07068dae1..d97e13a6aa0fd60968c138c991fe3a6cfe8e8cac 100644 (file)
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -98,6 +98,44 @@ EOF
        isnt($?, 0, '$? set properly: '.$?);
 }
 
+{ # ->CLOSE vs ->DESTROY waitpid caller distinction
+       my @c;
+       my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+       ok(close($fh), '->CLOSE fired and successful');
+       ok(scalar(@c), 'callback fired by ->CLOSE');
+       ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
+
+       @c = ();
+       $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+       undef $fh; # ->DESTROY
+       ok(scalar(@c), 'callback fired by ->DESTROY');
+       ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
+               'callback not invoked by ProcessPipe');
+}
+
+{ # children don't wait on siblings
+       use POSIX qw(_exit);
+       pipe(my ($r, $w)) or BAIL_OUT $!;
+       my $cb = sub { warn "x=$$\n" };
+       my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb });
+       my $pp = tied *$fh;
+       my $pid = fork // BAIL_OUT $!;
+       local $SIG{__WARN__} = sub { _exit(1) };
+       if ($pid == 0) {
+               local $SIG{__DIE__} = sub { _exit(2) };
+               undef $fh;
+               _exit(0);
+       }
+       waitpid($pid, 0);
+       is($?, 0, 'forked process exited');
+       my @w;
+       local $SIG{__WARN__} = sub { push @w, @_ };
+       close $w;
+       close $fh;
+       is($?, 0, 'cat exited');
+       is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner');
+}
+
 SKIP: {
        eval {
                require BSD::Resource;