use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more write_in_full);
+our @EXPORT_OK = qw(now msg_more);
use warnings;
use 5.010_001;
);
use Errno qw(EAGAIN EINVAL);
-use Carp qw(croak confess);
+use Carp qw(croak confess carp);
use File::Temp qw(tempfile);
our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 };
}
return unless PostEventLoop();
}
- exit 0;
}
### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
}
return unless PostEventLoop();
}
-
- exit(0);
}
=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
sub psendfile ($$$) {
my ($sock, $fh, $off) = @_;
- sysseek($fh, $$off, SEEK_SET) or return;
- defined(my $to_write = sysread($fh, my $buf, 16384)) or return;
+ seek($fh, $$off, SEEK_SET) or return;
+ defined(my $to_write = read($fh, my $buf, 16384)) or return;
my $written = 0;
while ($to_write > 0) {
if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
}
} else { #($ref eq 'CODE') {
shift @$wbuf;
+ my $before = scalar(@$wbuf);
$bref->($self);
+
+ # bref may be enqueueing more CODE to call (see accept_tls_step)
+ return 0 if (scalar(@$wbuf) > $before);
}
} # while @$wbuf
1; # all done
}
-sub write_in_full ($$$$) {
- my ($fh, $bref, $len, $off) = @_;
- my $rv = 0;
- while ($len > 0) {
- my $w = syswrite($fh, $$bref, $len, $off);
- return ($rv ? $rv : $w) unless $w; # undef or 0
- $rv += $w;
- $len -= $w;
- $off += $w;
+sub do_read ($$$$) {
+ my ($self, $rbuf, $len, $off) = @_;
+ my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+ return ($r == 0 ? $self->close : $r) if defined $r;
+ # common for clients to break connections without warning,
+ # would be too noisy to log here:
+ if (ref($self) eq 'IO::Socket::SSL') {
+ my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+ watch($self, $ev | EPOLLONESHOT);
+ } elsif ($! == EAGAIN) {
+ watch($self, EPOLLIN | EPOLLONESHOT);
+ } else {
+ $self->close;
}
- $rv
}
-sub tmpbuf ($$) {
- my ($bref, $off) = @_;
+# drop the socket if we hit unrecoverable errors on our system which
+# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE...
+sub drop {
+ my $self = shift;
+ carp(@_);
+ $self->close;
+}
+
+# n.b.: use ->write/->read for this buffer to allow compatibility with
+# PerlIO::mmap or PerlIO::scalar if needed
+sub tmpio ($$$) {
+ my ($self, $bref, $off) = @_;
# open(my $fh, '+>>', undef) doesn't set O_APPEND
- my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1);
- open $fh, '+>>', $path or die "open: $!";
- unlink $path;
- my $to_write = bytes::length($$bref) - $off;
- my $w = write_in_full($fh, $bref, $to_write, $off);
- die "write_in_full ($to_write): $!" unless defined $w;
- $w == $to_write ? $fh : die("short write $w < $to_write");
+ my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) };
+ $fh or return drop($self, "tempfile: $@");
+ open($fh, '+>>', $path) or return drop($self, "open: $!");
+ $fh->autoflush(1);
+ unlink($path) or return drop($self, "unlink: $!");
+ my $len = bytes::length($$bref) - $off;
+ $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
+ $fh
}
=head2 C<< $obj->write( $data ) >>
} else {
my $last = $wbuf->[-1];
if (ref($last) eq 'GLOB') { # append to tmp file buffer
- write_in_full($last, $bref, bytes::length($$bref), 0);
+ $last->print($$bref) or return drop($self, "print: $!");
} else {
- push @$wbuf, tmpbuf($bref, 0);
+ my $tmpio = tmpio($self, $bref, 0) or return 0;
+ push @$wbuf, $tmpio;
}
}
return 0;
} else {
return $self->close;
}
- $self->{wbuf} = [ tmpbuf($bref, $written) ];
+ my $tmpio = tmpio($self, $bref, $written) or return 0;
+ $self->{wbuf} = [ $tmpio ];
watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
my $self = $_[0];
my $sock = $self->{sock} or return 1;
- if (MSG_MORE && !$self->{wbuf}) {
+ if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') {
my $n = send($sock, $_[1], MSG_MORE);
if (defined $n) {
my $nlen = bytes::length($_[1]) - $n;
return 1 if $nlen == 0; # all done!
-
# queue up the unwritten substring:
- $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
+ my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
+ $self->{wbuf} = [ $tmpio ];
watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
$KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
$KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
}
+ 0;
}
sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
+# return true if complete, false if incomplete (or failure)
+sub accept_tls_step ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ return 1 if $sock->accept_SSL;
+ return $self->close if $! != EAGAIN;
+ if (my $ev = PublicInbox::TLS::epollbit()) {
+ unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+ return watch($self, $ev | EPOLLONESHOT);
+ }
+ drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+}
+
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {