X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=661be1fd2c4553723e07587f26e04be4adb5ffb1;hb=823dbcacf7829fd0f42748ae7712a76fa4c64a76;hp=970061fd55f1c44c77d51084d7d9891aeb3f48f3;hpb=8775167475d8bfc25d532b777147a8b1ef1cd99b;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 970061fd..661be1fd 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -13,28 +13,25 @@ # Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org, # fixed in Danga::Socket 1.62 and visible at: # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket +# +# fields: +# sock: underlying socket +# rbuf: scalarref, usually undef +# wbuf: arrayref of coderefs or tmpio (autovivified)) +# (tmpio = [ GLOB, offset, [ length ] ]) package PublicInbox::DS; use strict; use bytes; use POSIX qw(WNOHANG); use IO::Handle qw(); -use Fcntl qw(SEEK_SET :DEFAULT); +use Fcntl qw(SEEK_SET :DEFAULT O_APPEND); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); our @EXPORT_OK = qw(now msg_more); -use warnings; use 5.010_001; use Scalar::Util qw(blessed); - use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; - -use fields ('sock', # underlying socket - 'rbuf', # scalarref, usually undef - 'wbuf', # arrayref of coderefs or GLOB refs (autovivified) - 'wbuf_off', # offset into first element of wbuf to start writing at - ); - use Errno qw(EAGAIN EINVAL); use Carp qw(confess carp); @@ -43,7 +40,7 @@ my $wait_pids; # list of [ pid, callback, callback_arg ] my $later_queue; # list of callbacks to run at some later interval my $EXPMAP; # fd -> idle_time our $EXPTIME = 180; # 3 minutes -my ($later_timer, $reap_timer, $exp_timer); +my ($later_timer, $reap_armed, $exp_timer); my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object @@ -71,9 +68,9 @@ Reset all state =cut sub Reset { %DescriptorMap = (); - $wait_pids = $later_queue = undef; + $in_loop = $wait_pids = $later_queue = $reap_armed = undef; $EXPMAP = {}; - $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef; + $nextq = $ToClose = $later_timer = $exp_timer = undef; $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -98,18 +95,18 @@ sub SetLoopTimeout { return $LoopTimeout = $_[1] + 0; } -=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >> +=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >> Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. =cut -sub add_timer ($$) { - my ($secs, $coderef) = @_; +sub add_timer ($$;$) { + my ($secs, $coderef, $arg) = @_; my $fire_time = now() + $secs; - my $timer = [$fire_time, $coderef]; + my $timer = [$fire_time, $coderef, $arg]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -201,7 +198,7 @@ sub RunTimers { # Run expired timers while (@Timers && $Timers[0][0] <= $now) { my $to_run = shift(@Timers); - $to_run->[1]->($now) if $to_run->[1]; + $to_run->[1]->($to_run->[2]); } # timers may enqueue into nextq: @@ -228,24 +225,26 @@ sub RunTimers { # and other things. So we scan the $wait_pids list, which is hopefully # not too big. We keep $wait_pids small by not calling dwaitpid() # until we've hit EOF when reading the stdout of the child. + sub reap_pids { - my $tmp = $wait_pids or return; - $wait_pids = $reap_timer = undef; - foreach my $ary (@$tmp) { - my ($pid, $cb, $arg) = @$ary; - my $ret = waitpid($pid, WNOHANG); - if ($ret == 0) { - push @$wait_pids, $ary; # autovivifies @$wait_pids - } elsif ($cb) { - eval { $cb->($arg, $pid) }; - } - } - # we may not be done, yet, and could've missed/masked a SIGCHLD: - $reap_timer = add_timer(1, \&reap_pids) if $wait_pids; + $reap_armed = undef; + my $tmp = $wait_pids or return; + $wait_pids = undef; + foreach my $ary (@$tmp) { + my ($pid, $cb, $arg) = @$ary; + my $ret = waitpid($pid, WNOHANG); + if ($ret == 0) { + push @$wait_pids, $ary; # autovivifies @$wait_pids + } elsif ($cb) { + eval { $cb->($arg, $pid) }; + } + } + # we may not be done, yet, and could've missed/masked a SIGCHLD: + $reap_armed //= requeue(\&reap_pids) if $wait_pids; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) -sub enqueue_reap ($) { push @$nextq, \&reap_pids }; # autovivifies +sub enqueue_reap { $reap_armed //= requeue(\&reap_pids) } sub in_loop () { $in_loop } @@ -328,8 +327,6 @@ This is normally (always?) called from your subclass via: =cut sub new { my ($self, $sock, $ev) = @_; - $self = fields::new($self) unless ref $self; - $self->{sock} = $sock; my $fd = fileno($sock); @@ -392,11 +389,13 @@ sub close { } # portable, non-thread-safe sendfile emulation (no pread, yet) -sub psendfile ($$$) { - my ($sock, $fh, $off) = @_; +sub send_tmpio ($$) { + my ($sock, $tmpio) = @_; - seek($fh, $$off, SEEK_SET) or return; - defined(my $to_write = read($fh, my $buf, 16384)) or return; + sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return; + my $n = $tmpio->[2] // 65536; + $n = 65536 if $n > 65536; + defined(my $to_write = sysread($tmpio->[0], my $buf, $n)) or return; my $written = 0; while ($to_write > 0) { if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { @@ -407,26 +406,26 @@ sub psendfile ($$$) { last; } } - $$off += $written; + $tmpio->[1] += $written; # offset + $tmpio->[2] -= $written if defined($tmpio->[2]); # length $written; } sub epbit ($$) { # (sock, default) - ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1]; + $_[0]->can('stop_SSL') ? PublicInbox::TLS::epollbit() : $_[1]; } # returns 1 if done, 0 if incomplete sub flush_write ($) { my ($self) = @_; + my $sock = $self->{sock} or return; my $wbuf = $self->{wbuf} or return 1; - my $sock = $self->{sock}; next_buf: while (my $bref = $wbuf->[0]) { if (ref($bref) ne 'CODE') { - my $off = delete($self->{wbuf_off}) // 0; while ($sock) { - my $w = psendfile($sock, $bref, \$off); + my $w = send_tmpio($sock, $bref); # bref is tmpio if (defined $w) { if ($w == 0) { shift @$wbuf; @@ -434,13 +433,12 @@ next_buf: } } elsif ($! == EAGAIN) { epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT); - $self->{wbuf_off} = $off; return 0; } else { return $self->close; } } - } else { #($ref eq 'CODE') { + } else { #(ref($bref) eq 'CODE') { shift @$wbuf; my $before = scalar(@$wbuf); $bref->($self); @@ -490,12 +488,12 @@ sub drop { # PerlIO::mmap or PerlIO::scalar if needed sub tmpio ($$$) { my ($self, $bref, $off) = @_; - my $fh = tmpfile('wbuf', $self->{sock}, 1) or + my $fh = tmpfile('wbuf', $self->{sock}, O_APPEND) or return drop($self, "tmpfile $!"); $fh->autoflush(1); my $len = bytes::length($$bref) - $off; $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); - $fh + [ $fh, 0 ] # [1] = offset, [2] = length, not set by us } =head2 C<< $obj->write( $data ) >> @@ -524,9 +522,9 @@ sub write { if ($ref eq 'CODE') { push @$wbuf, $bref; } else { - my $last = $wbuf->[-1]; - if (ref($last) eq 'GLOB') { # append to tmp file buffer - $last->print($$bref) or return drop($self, "print: $!"); + my $tmpio = $wbuf->[-1]; + if ($tmpio && !defined($tmpio->[2])) { # append to tmp file buffer + $tmpio->[0]->print($$bref) or return drop($self, "print: $!"); } else { my $tmpio = tmpio($self, $bref, 0) or return 0; push @$wbuf, $tmpio; @@ -568,7 +566,7 @@ sub msg_more ($$) { my $wbuf = $self->{wbuf}; if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) && - ref($sock) ne 'IO::Socket::SSL') { + !$sock->can('stop_SSL')) { my $n = send($sock, $_[1], MSG_MORE); if (defined $n) { my $nlen = bytes::length($_[1]) - $n; @@ -614,11 +612,11 @@ sub shutdn_tls_step ($) { } # don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC -# or fork w/o exec, so no inadvertant socket sharing +# or fork w/o exec, so no inadvertent socket sharing sub shutdn ($) { my ($self) = @_; my $sock = $self->{sock} or return; - if (ref($sock) eq 'IO::Socket::SSL') { + if ($sock->can('stop_SSL')) { shutdn_tls_step($self); } else { $self->close; @@ -631,7 +629,7 @@ sub dwaitpid ($$$) { push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ] # We could've just missed our SIGCHLD, cover it, here: - requeue(\&reap_pids); + goto &enqueue_reap; # tail recursion } sub _run_later () {