]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
ds|nntp: use CORE::close on socket
[public-inbox.git] / lib / PublicInbox / DS.pm
index 486af40eb9e1aaafc611f68db559e7a33f25a75a..2c886b4e4ff64674b6d367be0b528c5322417cdf 100644 (file)
@@ -33,7 +33,7 @@ use fields ('sock',              # underlying socket
             );
 
 use Errno  qw(EAGAIN EINVAL);
-use Carp   qw(croak confess);
+use Carp   qw(croak confess carp);
 use File::Temp qw(tempfile);
 
 our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 };
@@ -293,8 +293,8 @@ sub PostEventLoop {
     while (my $sock = shift @ToClose) {
         my $fd = fileno($sock);
 
-        # close the socket.  (not a PublicInbox::DS close)
-        $sock->close;
+        # close the socket. (not a PublicInbox::DS close)
+        CORE::close($sock);
 
         # and now we can finally remove the fd from the map.  see
         # comment above in ->close.
@@ -465,7 +465,11 @@ next_buf:
             }
         } else { #($ref eq 'CODE') {
             shift @$wbuf;
+            my $before = scalar(@$wbuf);
             $bref->($self);
+
+            # bref may be enqueueing more CODE to call (see accept_tls_step)
+            return 0 if (scalar(@$wbuf) > $before);
         }
     } # while @$wbuf
 
@@ -479,21 +483,37 @@ sub do_read ($$$$) {
     return ($r == 0 ? $self->close : $r) if defined $r;
     # common for clients to break connections without warning,
     # would be too noisy to log here:
-    $! == EAGAIN ? $self->watch_in1 : $self->close;
+    if (ref($self) eq 'IO::Socket::SSL') {
+        my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+        watch($self, $ev | EPOLLONESHOT);
+    } elsif ($! == EAGAIN) {
+        watch($self, EPOLLIN | EPOLLONESHOT);
+    } else {
+        $self->close;
+    }
+}
+
+# drop the socket if we hit unrecoverable errors on our system which
+# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE...
+sub drop {
+    my $self = shift;
+    carp(@_);
+    $self->close;
 }
 
 # n.b.: use ->write/->read for this buffer to allow compatibility with
 # PerlIO::mmap or PerlIO::scalar if needed
-sub tmpbuf ($$) {
-    my ($bref, $off) = @_;
+sub tmpio ($$$) {
+    my ($self, $bref, $off) = @_;
     # open(my $fh, '+>>', undef) doesn't set O_APPEND
-    my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1);
-    open $fh, '+>>', $path or die "open: $!";
+    my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) };
+    $fh or return drop($self, "tempfile: $@");
+    open($fh, '+>>', $path) or return drop($self, "open: $!");
     $fh->autoflush(1);
-    unlink $path;
-    my $to_write = bytes::length($$bref) - $off;
-    $fh->write($$bref, $to_write, $off) or die "write ($to_write): $!";
-    $fh;
+    unlink($path) or return drop($self, "unlink: $!");
+    my $len = bytes::length($$bref) - $off;
+    $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
+    $fh
 }
 
 =head2 C<< $obj->write( $data ) >>
@@ -523,12 +543,10 @@ sub write {
         } else {
             my $last = $wbuf->[-1];
             if (ref($last) eq 'GLOB') { # append to tmp file buffer
-                unless ($last->print($$bref)) {
-                    warn "error buffering: $!";
-                    return $self->close;
-                }
+                $last->print($$bref) or return drop($self, "print: $!");
             } else {
-                push @$wbuf, tmpbuf($bref, 0);
+                my $tmpio = tmpio($self, $bref, 0) or return 0;
+                push @$wbuf, $tmpio;
             }
         }
         return 0;
@@ -546,7 +564,8 @@ sub write {
         } else {
             return $self->close;
         }
-        $self->{wbuf} = [ tmpbuf($bref, $written) ];
+        my $tmpio = tmpio($self, $bref, $written) or return 0;
+        $self->{wbuf} = [ $tmpio ];
         watch($self, EPOLLOUT|EPOLLONESHOT);
         return 0;
     }
@@ -558,14 +577,14 @@ sub msg_more ($$) {
     my $self = $_[0];
     my $sock = $self->{sock} or return 1;
 
-    if (MSG_MORE && !$self->{wbuf}) {
+    if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') {
         my $n = send($sock, $_[1], MSG_MORE);
         if (defined $n) {
             my $nlen = bytes::length($_[1]) - $n;
             return 1 if $nlen == 0; # all done!
-
             # queue up the unwritten substring:
-            $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
+            my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
+            $self->{wbuf} = [ $tmpio ];
             watch($self, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
@@ -589,6 +608,19 @@ sub watch ($$) {
 
 sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
 
+# return true if complete, false if incomplete (or failure)
+sub accept_tls_step ($) {
+    my ($self) = @_;
+    my $sock = $self->{sock} or return;
+    return 1 if $sock->accept_SSL;
+    return $self->close if $! != EAGAIN;
+    if (my $ev = PublicInbox::TLS::epollbit()) {
+        unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+        return watch($self, $ev | EPOLLONESHOT);
+    }
+    drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+}
+
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {