]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
tls: epollbit: account for miscellaneous OpenSSL errors
[public-inbox.git] / lib / PublicInbox / DS.pm
index c76a5038354c2d846bef2ccb5a3f3754ba5de486..a02b3bb78641bd12254e718c3344a2bfef1801de 100644 (file)
 # Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org,
 # fixed in Danga::Socket 1.62 and visible at:
 # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket
+#
+# fields:
+# sock: underlying socket
+# rbuf: scalarref, usually undef
+# wbuf: arrayref of coderefs or tmpio (autovivified))
+#        (tmpio = [ GLOB, offset, [ length ] ])
 package PublicInbox::DS;
 use strict;
 use bytes;
@@ -22,19 +28,10 @@ use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
 use parent qw(Exporter);
 our @EXPORT_OK = qw(now msg_more);
-use warnings;
 use 5.010_001;
 use Scalar::Util qw(blessed);
-
 use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
-
-use fields ('sock',              # underlying socket
-            'rbuf',              # scalarref, usually undef
-            'wbuf', # arrayref of coderefs or tmpio (autovivified))
-                    # (tmpio = [ GLOB, offset, [ length ] ])
-            );
-
 use Errno qw(EAGAIN EINVAL);
 use Carp qw(confess carp);
 
@@ -43,7 +40,7 @@ my $wait_pids; # list of [ pid, callback, callback_arg ]
 my $later_queue; # list of callbacks to run at some later interval
 my $EXPMAP; # fd -> idle_time
 our $EXPTIME = 180; # 3 minutes
-my ($later_timer, $reap_timer, $exp_timer);
+my ($later_timer, $reap_armed, $exp_timer);
 my $ToClose; # sockets to close when event loop is done
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
@@ -71,9 +68,9 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
-    $wait_pids = $later_queue = undef;
+    $in_loop = $wait_pids = $later_queue = $reap_armed = undef;
     $EXPMAP = {};
-    $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
+    $nextq = $ToClose = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
 
@@ -98,18 +95,18 @@ sub SetLoopTimeout {
     return $LoopTimeout = $_[1] + 0;
 }
 
-=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >>
+=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >>
 
 Add a timer to occur $seconds from now. $seconds may be fractional, but timers
 are not guaranteed to fire at the exact time you ask for.
 
 =cut
-sub add_timer ($$) {
-    my ($secs, $coderef) = @_;
+sub add_timer ($$;$) {
+    my ($secs, $coderef, $arg) = @_;
 
     my $fire_time = now() + $secs;
 
-    my $timer = [$fire_time, $coderef];
+    my $timer = [$fire_time, $coderef, $arg];
 
     if (!@Timers || $fire_time >= $Timers[-1][0]) {
         push @Timers, $timer;
@@ -201,7 +198,7 @@ sub RunTimers {
     # Run expired timers
     while (@Timers && $Timers[0][0] <= $now) {
         my $to_run = shift(@Timers);
-        $to_run->[1]->($now) if $to_run->[1];
+        $to_run->[1]->($to_run->[2]);
     }
 
     # timers may enqueue into nextq:
@@ -228,24 +225,26 @@ sub RunTimers {
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
 # until we've hit EOF when reading the stdout of the child.
+
 sub reap_pids {
-    my $tmp = $wait_pids or return;
-    $wait_pids = $reap_timer = undef;
-    foreach my $ary (@$tmp) {
-        my ($pid, $cb, $arg) = @$ary;
-        my $ret = waitpid($pid, WNOHANG);
-        if ($ret == 0) {
-            push @$wait_pids, $ary; # autovivifies @$wait_pids
-        } elsif ($cb) {
-            eval { $cb->($arg, $pid) };
-        }
-    }
-    # we may not be done, yet, and could've missed/masked a SIGCHLD:
-    $reap_timer = add_timer(1, \&reap_pids) if $wait_pids;
+       $reap_armed = undef;
+       my $tmp = $wait_pids or return;
+       $wait_pids = undef;
+       foreach my $ary (@$tmp) {
+               my ($pid, $cb, $arg) = @$ary;
+               my $ret = waitpid($pid, WNOHANG);
+               if ($ret == 0) {
+                       push @$wait_pids, $ary; # autovivifies @$wait_pids
+               } elsif ($cb) {
+                       eval { $cb->($arg, $pid) };
+               }
+       }
+       # we may not be done, yet, and could've missed/masked a SIGCHLD:
+       $reap_armed //= requeue(\&reap_pids) if $wait_pids;
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)
-sub enqueue_reap ($) { push @$nextq, \&reap_pids }; # autovivifies
+sub enqueue_reap () { $reap_armed //= requeue(\&reap_pids) }
 
 sub in_loop () { $in_loop }
 
@@ -328,13 +327,12 @@ This is normally (always?) called from your subclass via:
 =cut
 sub new {
     my ($self, $sock, $ev) = @_;
-    $self = fields::new($self) unless ref $self;
-
     $self->{sock} = $sock;
     my $fd = fileno($sock);
 
     _InitPoller();
 
+retry:
     if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
         if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
             $ev &= ~EPOLLEXCLUSIVE;
@@ -395,10 +393,10 @@ sub close {
 sub send_tmpio ($$) {
     my ($sock, $tmpio) = @_;
 
-    seek($tmpio->[0], $tmpio->[1], SEEK_SET) or return;
+    sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return;
     my $n = $tmpio->[2] // 65536;
     $n = 65536 if $n > 65536;
-    defined(my $to_write = read($tmpio->[0], my $buf, $n)) or return;
+    defined(my $to_write = sysread($tmpio->[0], my $buf, $n)) or return;
     my $written = 0;
     while ($to_write > 0) {
         if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
@@ -415,14 +413,14 @@ sub send_tmpio ($$) {
 }
 
 sub epbit ($$) { # (sock, default)
-    ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1];
+       $_[0]->can('stop_SSL') ? PublicInbox::TLS::epollbit() : $_[1];
 }
 
 # returns 1 if done, 0 if incomplete
 sub flush_write ($) {
     my ($self) = @_;
+    my $sock = $self->{sock} or return;
     my $wbuf = $self->{wbuf} or return 1;
-    my $sock = $self->{sock};
 
 next_buf:
     while (my $bref = $wbuf->[0]) {
@@ -435,7 +433,8 @@ next_buf:
                         goto next_buf;
                     }
                 } elsif ($! == EAGAIN) {
-                    epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
+                    my $ev = epbit($sock, EPOLLOUT) or return $self->close;
+                    epwait($sock, $ev | EPOLLONESHOT);
                     return 0;
                 } else {
                     return $self->close;
@@ -471,7 +470,8 @@ sub do_read ($$$;$) {
     # common for clients to break connections without warning,
     # would be too noisy to log here:
     if ($! == EAGAIN) {
-        epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT);
+        my $ev = epbit($sock, EPOLLIN) or return $self->close;
+        epwait($sock, $ev | EPOLLONESHOT);
         rbuf_idle($self, $rbuf);
         0;
     } else {
@@ -545,7 +545,8 @@ sub write {
             return 1 if $written == $to_write;
             requeue($self); # runs: event_step -> flush_write
         } elsif ($! == EAGAIN) {
-            epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
+            my $ev = epbit($sock, EPOLLOUT) or return $self->close;
+            epwait($sock, $ev | EPOLLONESHOT);
             $written = 0;
         } else {
             return $self->close;
@@ -569,7 +570,7 @@ sub msg_more ($$) {
     my $wbuf = $self->{wbuf};
 
     if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
-               ref($sock) ne 'IO::Socket::SSL') {
+               !$sock->can('stop_SSL')) {
         my $n = send($sock, $_[1], MSG_MORE);
         if (defined $n) {
             my $nlen = bytes::length($_[1]) - $n;
@@ -598,7 +599,8 @@ sub accept_tls_step ($) {
     my $sock = $self->{sock} or return;
     return 1 if $sock->accept_SSL;
     return $self->close if $! != EAGAIN;
-    epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+    my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+    epwait($sock, $ev | EPOLLONESHOT);
     unshift(@{$self->{wbuf}}, \&accept_tls_step); # autovivifies
     0;
 }
@@ -609,17 +611,18 @@ sub shutdn_tls_step ($) {
     my $sock = $self->{sock} or return;
     return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
     return $self->close if $! != EAGAIN;
-    epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+    my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+    epwait($sock, $ev | EPOLLONESHOT);
     unshift(@{$self->{wbuf}}, \&shutdn_tls_step); # autovivifies
     0;
 }
 
 # don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
-# or fork w/o exec, so no inadvertant socket sharing
+# or fork w/o exec, so no inadvertent socket sharing
 sub shutdn ($) {
     my ($self) = @_;
     my $sock = $self->{sock} or return;
-    if (ref($sock) eq 'IO::Socket::SSL') {
+    if ($sock->can('stop_SSL')) {
         shutdn_tls_step($self);
     } else {
        $self->close;
@@ -632,7 +635,7 @@ sub dwaitpid ($$$) {
        push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ]
 
        # We could've just missed our SIGCHLD, cover it, here:
-       requeue(\&reap_pids);
+       enqueue_reap();
 }
 
 sub _run_later () {