]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
ds: share lazy rbuf handling between HTTP and NNTP
[public-inbox.git] / lib / PublicInbox / DS.pm
index d6ef0b8d4753f6907b9176160223989b10381b22..2824084334e4bc026b1a327ba483feb5238b26c8 100644 (file)
@@ -18,7 +18,7 @@ use strict;
 use bytes;
 use POSIX ();
 use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
+use Fcntl qw(SEEK_SET :DEFAULT);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
 use parent qw(Exporter);
 our @EXPORT_OK = qw(now msg_more);
@@ -28,13 +28,14 @@ use 5.010_001;
 use PublicInbox::Syscall qw(:epoll);
 
 use fields ('sock',              # underlying socket
+            'rbuf',              # scalarref, usually undef
             'wbuf',              # arrayref of coderefs or GLOB refs
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
 
-use Errno  qw(EAGAIN EINVAL);
+use Errno  qw(EAGAIN EINVAL EEXIST);
 use Carp   qw(croak confess carp);
-use File::Temp qw(tempfile);
+require File::Spec;
 
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
@@ -142,15 +143,17 @@ sub _InitPoller
     return if $DoneInit;
     $DoneInit = 1;
 
-    if (!PublicInbox::Syscall::epoll_defined())  {
-        $Epoll = eval {
-            require PublicInbox::DSKQXS;
-            PublicInbox::DSKQXS->import;
-            PublicInbox::DSKQXS->new;
-        };
-    } else {
+    if (PublicInbox::Syscall::epoll_defined())  {
         $Epoll = epoll_create();
         set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
+    } else {
+        my $cls;
+        for (qw(DSKQXS DSPoll)) {
+            $cls = "PublicInbox::$_";
+            last if eval "require $cls";
+        }
+        $cls->import(qw(epoll_ctl epoll_wait));
+        $Epoll = $cls->new;
     }
     *EventLoop = *EpollEventLoop;
 }
@@ -410,16 +413,27 @@ next_buf:
     1; # all done
 }
 
-sub do_read ($$$$) {
+sub rbuf_idle ($$) {
+    my ($self, $rbuf) = @_;
+    if ($$rbuf eq '') { # who knows how long till we can read again
+        delete $self->{rbuf};
+    } else {
+        $self->{rbuf} = $rbuf;
+    }
+}
+
+sub do_read ($$$;$) {
     my ($self, $rbuf, $len, $off) = @_;
-    my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+    my $r = sysread($self->{sock}, $$rbuf, $len, $off // 0);
     return ($r == 0 ? $self->close : $r) if defined $r;
     # common for clients to break connections without warning,
     # would be too noisy to log here:
     if (ref($self) eq 'IO::Socket::SSL') {
         my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+        rbuf_idle($self, $rbuf);
         watch($self, $ev | EPOLLONESHOT);
     } elsif ($! == EAGAIN) {
+        rbuf_idle($self, $rbuf);
         watch($self, EPOLLIN | EPOLLONESHOT);
     } else {
         $self->close;
@@ -438,12 +452,16 @@ sub drop {
 # PerlIO::mmap or PerlIO::scalar if needed
 sub tmpio ($$$) {
     my ($self, $bref, $off) = @_;
-    # open(my $fh, '+>>', undef) doesn't set O_APPEND
-    my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) };
-    $fh or return drop($self, "tempfile: $@");
-    open($fh, '+>>', $path) or return drop($self, "open: $!");
+    my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND
+    do {
+        my $fn = File::Spec->tmpdir . '/wbuf-' . rand;
+        if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely
+            unlink($fn) or return drop($self, "unlink($fn) $!");
+        } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM
+            return drop($self, "open: $!");
+        }
+    } until (defined $fh);
     $fh->autoflush(1);
-    unlink($path) or return drop($self, "unlink: $!");
     my $len = bytes::length($$bref) - $off;
     $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
     $fh
@@ -499,7 +517,10 @@ sub write {
             return $self->close;
         }
         my $tmpio = tmpio($self, $bref, $written) or return 0;
-        $self->{wbuf} = [ $tmpio ];
+
+        # wbuf may be an empty array if we're being called inside
+        # ->flush_write via CODE bref:
+        push @{$self->{wbuf} ||= []}, $tmpio;
         watch($self, EPOLLOUT|EPOLLONESHOT);
         return 0;
     }