use bytes;
use POSIX ();
use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
+use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more);
+our @EXPORT_OK = qw(now msg_more write_in_full);
use warnings;
use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
- 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write
+ 'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
);
use Errno qw(EAGAIN EINVAL);
use Carp qw(croak confess);
+use File::Temp qw(tempfile);
use constant POLLIN => 1;
use constant POLLOUT => 4;
return 0;
}
+# portable, non-thread-safe sendfile emulation (no pread, yet)
+sub psendfile ($$$) {
+ my ($sock, $fh, $off) = @_;
+
+ sysseek($fh, $$off, SEEK_SET) or return;
+ defined(my $to_write = sysread($fh, my $buf, 16384)) or return;
+ my $written = 0;
+ while ($to_write > 0) {
+ if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
+ $written += $w;
+ $to_write -= $w;
+ } else {
+ return if $written == 0;
+ last;
+ }
+ }
+ $$off += $written;
+ $written;
+}
+
# returns 1 if done, 0 if incomplete
sub flush_write ($) {
my ($self) = @_;
my $wbuf = $self->{wbuf} or return 1;
my $sock = $self->{sock} or return 1;
+next_buf:
while (my $bref = $wbuf->[0]) {
- my $ref = ref($bref);
- if ($ref eq 'SCALAR') {
- my $len = bytes::length($$bref);
- my $off = $self->{wbuf_off} || 0;
- my $to_write = $len - $off;
- my $written = syswrite($sock, $$bref, $to_write, $off);
- if (defined $written) {
- if ($written == $to_write) {
- shift @$wbuf;
+ if (ref($bref) ne 'CODE') {
+ my $off = delete($self->{wbuf_off}) // 0;
+ while (1) {
+ my $w = psendfile($sock, $bref, \$off);
+ if (defined $w) {
+ if ($w == 0) {
+ shift @$wbuf;
+ goto next_buf;
+ }
+ } elsif ($! == EAGAIN) {
+ $self->{wbuf_off} = $off;
+ watch_write($self, 1);
+ return 0;
} else {
- $self->{wbuf_off} = $off + $written;
+ return $self->close;
}
- next; # keep going until EAGAIN
- } elsif ($! == EAGAIN) {
- $self->watch_write(1);
- } else {
- $self->close;
}
- return 0;
} else { #($ref eq 'CODE') {
shift @$wbuf;
$bref->();
1; # all done
}
+sub write_in_full ($$$$) {
+ my ($fh, $bref, $len, $off) = @_;
+ my $rv = 0;
+ while ($len > 0) {
+ my $w = syswrite($fh, $$bref, $len, $off);
+ return ($rv ? $rv : $w) unless $w; # undef or 0
+ $rv += $w;
+ $len -= $w;
+ $off += $w;
+ }
+ $rv
+}
+
+sub tmpbuf ($$) {
+ my ($bref, $off) = @_;
+ # open(my $fh, '+>>', undef) doesn't set O_APPEND
+ my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1);
+ open $fh, '+>>', $path or die "open: $!";
+ unlink $path;
+ my $to_write = bytes::length($$bref) - $off;
+ my $w = write_in_full($fh, $bref, $to_write, $off);
+ die "write_in_full ($to_write): $!" unless defined $w;
+ $w == $to_write ? $fh : die("short write $w < $to_write");
+}
+
=head2 C<< $obj->write( $data ) >>
Write the specified data to the underlying handle. I<data> may be scalar,
my $ref = ref $data;
my $bref = $ref ? $data : \$data;
if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more...
- push @$wbuf, $bref;
+ if ($ref eq 'CODE') {
+ push @$wbuf, $bref;
+ } else {
+ my $last = $wbuf->[-1];
+ if (ref($last) eq 'GLOB') { # append to tmp file buffer
+ write_in_full($last, $bref, bytes::length($$bref), 0);
+ } else {
+ push @$wbuf, tmpbuf($bref, 0);
+ }
+ }
return 0;
} elsif ($ref eq 'CODE') {
$bref->();
if (defined $written) {
return 1 if $written == $to_write;
- $self->{wbuf_off} = $written;
- $self->{wbuf} = [ $bref ];
- return flush_write($self); # try until EAGAIN
} elsif ($! == EAGAIN) {
- $self->{wbuf} = [ $bref ];
- $self->watch_write(1);
+ $written = 0;
} else {
- $self->close;
+ return $self->close;
}
+ $self->{wbuf} = [ tmpbuf($bref, $written) ];
+ watch_write($self, 1);
return 0;
}
}
my $nlen = bytes::length($_[1]) - $n;
return 1 if $nlen == 0; # all done!
- # PublicInbox::DS::write queues the unwritten substring:
- return $self->write(substr($_[1], $n, $nlen));
+ # queue up the unwritten substring:
+ $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
+ watch_write($self, 1);
+ return 0;
}
}
$self->write(\($_[1]));
use HTTP::Date qw(time2str);
use IO::Handle;
require PublicInbox::EvCleanup;
-PublicInbox::DS->import('msg_more');
+PublicInbox::DS->import(qw(msg_more write_in_full));
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
CHUNK_END => -2, # \r\n
while ($len > 0) {
if ($$rbuf ne '') {
- my $w = write_in_full($input, $rbuf, $len);
+ my $w = write_in_full($input, $rbuf, $len, 0);
return write_err($self, $len) unless $w;
$len -= $w;
die "BUG: $len < 0 (w=$w)" if $len < 0;
quit($self, 500);
}
-sub write_in_full {
- my ($fh, $rbuf, $len) = @_;
- my $rv = 0;
- my $off = 0;
- while ($len > 0) {
- my $w = syswrite($fh, $$rbuf, $len, $off);
- return ($rv ? $rv : $w) unless $w; # undef or 0
- $rv += $w;
- $off += $w;
- $len -= $w;
- }
- $rv
-}
-
sub read_input_chunked { # unlikely...
my ($self) = @_;
my $input = $self->{env}->{'psgi.input'};
# drain the current chunk
until ($len <= 0) {
if ($$rbuf ne '') {
- my $w = write_in_full($input, $rbuf, $len);
+ my $w = write_in_full($input, $rbuf, $len, 0);
return write_err($self, "$len chunk") if !$w;
$len -= $w;
if ($len == 0) {