# Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org,
# fixed in Danga::Socket 1.62 and visible at:
# https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket
+#
+# fields:
+# sock: underlying socket
+# rbuf: scalarref, usually undef
+# wbuf: arrayref of coderefs or tmpio (autovivified))
+# (tmpio = [ GLOB, offset, [ length ] ])
package PublicInbox::DS;
use strict;
use bytes;
use POSIX qw(WNOHANG);
use IO::Handle qw();
-use Fcntl qw(SEEK_SET :DEFAULT);
+use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
our @EXPORT_OK = qw(now msg_more);
-use warnings;
use 5.010_001;
use Scalar::Util qw(blessed);
-
use PublicInbox::Syscall qw(:epoll);
use PublicInbox::Tmpfile;
-
-use fields ('sock', # underlying socket
- 'rbuf', # scalarref, usually undef
- 'wbuf', # arrayref of coderefs or GLOB refs (autovivified)
- 'wbuf_off', # offset into first element of wbuf to start writing at
- );
-
use Errno qw(EAGAIN EINVAL);
use Carp qw(confess carp);
=cut
sub Reset {
%DescriptorMap = ();
- $wait_pids = $later_queue = undef;
+ $in_loop = $wait_pids = $later_queue = undef;
$EXPMAP = {};
$nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
$LoopTimeout = -1; # no timeout by default
return $LoopTimeout = $_[1] + 0;
}
-=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >>
+=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >>
Add a timer to occur $seconds from now. $seconds may be fractional, but timers
are not guaranteed to fire at the exact time you ask for.
=cut
-sub add_timer ($$) {
- my ($secs, $coderef) = @_;
+sub add_timer ($$;$) {
+ my ($secs, $coderef, $arg) = @_;
my $fire_time = now() + $secs;
- my $timer = [$fire_time, $coderef];
+ my $timer = [$fire_time, $coderef, $arg];
if (!@Timers || $fire_time >= $Timers[-1][0]) {
push @Timers, $timer;
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
my $to_run = shift(@Timers);
- $to_run->[1]->($now) if $to_run->[1];
+ $to_run->[1]->($to_run->[2]);
}
# timers may enqueue into nextq:
=cut
sub new {
my ($self, $sock, $ev) = @_;
- $self = fields::new($self) unless ref $self;
-
$self->{sock} = $sock;
my $fd = fileno($sock);
}
# portable, non-thread-safe sendfile emulation (no pread, yet)
-sub psendfile ($$$) {
- my ($sock, $fh, $off) = @_;
+sub send_tmpio ($$) {
+ my ($sock, $tmpio) = @_;
- seek($fh, $$off, SEEK_SET) or return;
- defined(my $to_write = read($fh, my $buf, 16384)) or return;
+ sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return;
+ my $n = $tmpio->[2] // 65536;
+ $n = 65536 if $n > 65536;
+ defined(my $to_write = sysread($tmpio->[0], my $buf, $n)) or return;
my $written = 0;
while ($to_write > 0) {
if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
last;
}
}
- $$off += $written;
+ $tmpio->[1] += $written; # offset
+ $tmpio->[2] -= $written if defined($tmpio->[2]); # length
$written;
}
sub epbit ($$) { # (sock, default)
- ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1];
+ $_[0]->can('stop_SSL') ? PublicInbox::TLS::epollbit() : $_[1];
}
# returns 1 if done, 0 if incomplete
sub flush_write ($) {
my ($self) = @_;
+ my $sock = $self->{sock} or return;
my $wbuf = $self->{wbuf} or return 1;
- my $sock = $self->{sock};
next_buf:
while (my $bref = $wbuf->[0]) {
if (ref($bref) ne 'CODE') {
- my $off = delete($self->{wbuf_off}) // 0;
while ($sock) {
- my $w = psendfile($sock, $bref, \$off);
+ my $w = send_tmpio($sock, $bref); # bref is tmpio
if (defined $w) {
if ($w == 0) {
shift @$wbuf;
}
} elsif ($! == EAGAIN) {
epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
- $self->{wbuf_off} = $off;
return 0;
} else {
return $self->close;
}
}
- } else { #($ref eq 'CODE') {
+ } else { #(ref($bref) eq 'CODE') {
shift @$wbuf;
my $before = scalar(@$wbuf);
$bref->($self);
# PerlIO::mmap or PerlIO::scalar if needed
sub tmpio ($$$) {
my ($self, $bref, $off) = @_;
- my $fh = tmpfile('wbuf', $self->{sock}, 1) or
+ my $fh = tmpfile('wbuf', $self->{sock}, O_APPEND) or
return drop($self, "tmpfile $!");
$fh->autoflush(1);
my $len = bytes::length($$bref) - $off;
$fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
- $fh
+ [ $fh, 0 ] # [1] = offset, [2] = length, not set by us
}
=head2 C<< $obj->write( $data ) >>
if ($ref eq 'CODE') {
push @$wbuf, $bref;
} else {
- my $last = $wbuf->[-1];
- if (ref($last) eq 'GLOB') { # append to tmp file buffer
- $last->print($$bref) or return drop($self, "print: $!");
+ my $tmpio = $wbuf->[-1];
+ if ($tmpio && !defined($tmpio->[2])) { # append to tmp file buffer
+ $tmpio->[0]->print($$bref) or return drop($self, "print: $!");
} else {
my $tmpio = tmpio($self, $bref, 0) or return 0;
push @$wbuf, $tmpio;
my $wbuf = $self->{wbuf};
if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
- ref($sock) ne 'IO::Socket::SSL') {
+ !$sock->can('stop_SSL')) {
my $n = send($sock, $_[1], MSG_MORE);
if (defined $n) {
my $nlen = bytes::length($_[1]) - $n;
}
# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
-# or fork w/o exec, so no inadvertant socket sharing
+# or fork w/o exec, so no inadvertent socket sharing
sub shutdn ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
- if (ref($sock) eq 'IO::Socket::SSL') {
+ if ($sock->can('stop_SSL')) {
shutdn_tls_step($self);
} else {
$self->close;