package PublicInbox::MboxReader;
use strict;
use v5.10.1;
-use PublicInbox::DS (); # localize $in_loop for error detection :<
use Data::Dumper;
$Data::Dumper::Useqq = 1; # should've been the default, for bad data
sub _mbox_from {
my ($mbfh, $from_re, $eml_cb, @arg) = @_;
- local $PublicInbox::DS::in_loop; # disable dwaitpid
my $buf = '';
my @raw;
while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) {
sub _mbox_cl ($$$;@) {
my ($mbfh, $uxs_from, $eml_cb, @arg) = @_;
- local $PublicInbox::DS::in_loop; # disable dwaitpid
my $buf = '';
while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) {
if ($r == 0) { # detect "curl --fail"
use strict;
use v5.10.1;
use PublicInbox::DS qw(dwaitpid);
+use Carp qw(carp);
sub TIEHANDLE {
my ($class, $pid, $fh, $cb, $arg) = @_;
- bless { pid => $pid, fh => $fh, cb => $cb, arg => $arg }, $class;
+ bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg },
+ $class;
}
sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) }
print { $self->{fh} } @_;
}
-sub adjust_ret { # dwaitpid callback
- my ($retref, $pid) = @_;
- $$retref = '' if $?
-}
+sub FILENO { fileno($_[0]->{fh}) }
-sub CLOSE {
- my $fh = delete($_[0]->{fh});
- my $ret = defined $fh ? close($fh) : '';
- my ($pid, $cb, $arg) = delete @{$_[0]}{qw(pid cb arg)};
- if (defined $pid) {
- unless ($cb) {
- $cb = \&adjust_ret;
- $arg = \$ret;
+sub _close ($;$) {
+ my ($self, $wait) = @_;
+ my $fh = delete $self->{fh};
+ my $ret = defined($fh) ? close($fh) : '';
+ my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)};
+ return $ret unless defined($pid) && $self->{ppid} == $$;
+ if ($wait) { # caller cares about the exit status:
+ my $wp = waitpid($pid, 0);
+ if ($wp == $pid) {
+ $ret = '' if $?;
+ if ($cb) {
+ eval { $cb->($arg, $pid) };
+ carp "E: cb(arg, $pid): $@" if $@;
+ }
+ } else {
+ carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?";
}
+ } else { # caller just undef-ed it, let event loop deal with it
dwaitpid $pid, $cb, $arg;
}
$ret;
}
-sub FILENO { fileno($_[0]->{fh}) }
+# if caller uses close(), assume they want to check $? immediately so
+# we'll waitpid() synchronously. n.b. wantarray doesn't seem to
+# propagate `undef' down to tied methods, otherwise I'd rely on that.
+sub CLOSE { _close($_[0], 1) }
+# if relying on DESTROY, assume the caller doesn't care about $? and
+# we can let the event loop call waitpid() whenever it gets SIGCHLD
sub DESTROY {
- CLOSE(@_);
+ _close($_[0]);
undef;
}
-sub pid { $_[0]->{pid} }
-
1;
}
sub popen_rd {
- my ($cmd, $env, $opts) = @_;
+ my ($cmd, $env, $opt) = @_;
pipe(my ($r, $w)) or die "pipe: $!\n";
- $opts ||= {};
- $opts->{1} = fileno($w);
- my $pid = spawn($cmd, $env, $opts);
+ $opt ||= {};
+ $opt->{1} = fileno($w);
+ my $pid = spawn($cmd, $env, $opt);
return ($r, $pid) if wantarray;
my $ret = gensym;
- tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r;
+ tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
$ret;
}
s/\n/\r\n/sg for (values %raw);
for my $fmt (@mbox) { $check_fmt->($fmt) }
+SKIP: {
+ use PublicInbox::Spawn qw(popen_rd);
+ use Time::HiRes qw(alarm);
+ my $fh = popen_rd([ $^X, '-E', <<'' ]);
+say "From x@y Fri Oct 2 00:00:00 1993";
+print "a: b\n\n", "x" x 70000, "\n\n";
+say "From x@y Fri Oct 2 00:00:00 2010";
+print "Final: bit\n\n", "Incomplete\n\n";
+exit 1
+
+ my @x;
+ eval { $reader->mboxrd($fh, sub { push @x, shift->as_string }) };
+ like($@, qr/error closing mbox/, 'detects error reading from pipe');
+ is(scalar(@x), 1, 'only saw one message');
+ is(scalar(grep(/Final/, @x)), 0, 'no incomplete bit');
+}
+
done_testing;
isnt($?, 0, '$? set properly: '.$?);
}
+{ # ->CLOSE vs ->DESTROY waitpid caller distinction
+ my @c;
+ my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+ ok(close($fh), '->CLOSE fired and successful');
+ ok(scalar(@c), 'callback fired by ->CLOSE');
+ ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
+
+ @c = ();
+ $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+ undef $fh; # ->DESTROY
+ ok(scalar(@c), 'callback fired by ->DESTROY');
+ ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
+ 'callback not invoked by ProcessPipe');
+}
+
+{ # children don't wait on siblings
+ use POSIX qw(_exit);
+ pipe(my ($r, $w)) or BAIL_OUT $!;
+ my $cb = sub { warn "x=$$\n" };
+ my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb });
+ my $pp = tied *$fh;
+ my $pid = fork // BAIL_OUT $!;
+ local $SIG{__WARN__} = sub { _exit(1) };
+ if ($pid == 0) {
+ local $SIG{__DIE__} = sub { _exit(2) };
+ undef $fh;
+ _exit(0);
+ }
+ waitpid($pid, 0);
+ is($?, 0, 'forked process exited');
+ my @w;
+ local $SIG{__WARN__} = sub { push @w, @_ };
+ close $w;
+ close $fh;
+ is($?, 0, 'cat exited');
+ is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner');
+}
+
SKIP: {
eval {
require BSD::Resource;