fcntl($out_w, 1031, 4096);
fcntl($in_r, 1031, 4096) if $batch eq '--batch-check';
}
+ $out_w->blocking(0);
$self->{$out} = $out_w;
$self->{$in} = $in_r;
}
for (my $i = 0; $i < @$inflight; $i += 3) {
$buf .= "$inflight->[$i]\n";
}
+ $self->{out}->blocking(1); # brand new pipe, should never block
print { $self->{out} } $buf or $self->fail("write error: $!");
+ $self->{out}->blocking(0);
my $req = shift @$inflight;
unshift(@$inflight, \$req); # \$ref to indicate retried
$self->{inflight_c} = [];
}
+sub write_all {
+ my ($self, $out, $buf, $read_step, $inflight) = @_;
+ $read_step->($self, $inflight) while @$inflight >= MAX_INFLIGHT;
+ do {
+ my $w = syswrite($out, $buf);
+ if (defined $w) {
+ return if $w == length($buf);
+ warn "chop: $w";
+ substr($buf, 0, $w, ''); # sv_chop
+ } elsif ($! != EAGAIN) {
+ $self->fail("write: $!");
+ } else { warn "E: $!" }
+ $read_step->($self, $inflight);
+ } while (1);
+}
+
sub check_async ($$$$) {
my ($self, $oid, $cb, $arg) = @_;
my $inflight_c = $self->{inflight_c} // check_async_begin($self);
- while (scalar(@$inflight_c) >= MAX_INFLIGHT) {
- check_async_step($self, $inflight_c);
- }
- print { $self->{out_c} } $oid, "\n" or $self->fail("write error: $!");
+ write_all($self, $self->{out_c}, $oid."\n",
+ \&check_async_step, $inflight_c);
push(@$inflight_c, $oid, $cb, $arg);
}
sub cat_async ($$$;$) {
my ($self, $oid, $cb, $arg) = @_;
my $inflight = $self->{inflight} // cat_async_begin($self);
- while (scalar(@$inflight) >= MAX_INFLIGHT) {
- cat_async_step($self, $inflight);
- }
- print { $self->{out} } $oid, "\n" or $self->fail("write error: $!");
+ write_all($self, $self->{out}, $oid."\n", \&cat_async_step, $inflight);
push(@$inflight, $oid, $cb, $arg);
}