]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
ds: handle deferred DS->close after timers
[public-inbox.git] / lib / PublicInbox / DS.pm
index d6ef0b8d4753f6907b9176160223989b10381b22..6cd527e265403aef4bd4061d6bf381a34c348858 100644 (file)
@@ -18,7 +18,7 @@ use strict;
 use bytes;
 use POSIX ();
 use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
+use Fcntl qw(SEEK_SET :DEFAULT);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
 use parent qw(Exporter);
 our @EXPORT_OK = qw(now msg_more);
@@ -28,14 +28,16 @@ use 5.010_001;
 use PublicInbox::Syscall qw(:epoll);
 
 use fields ('sock',              # underlying socket
+            'rbuf',              # scalarref, usually undef
             'wbuf',              # arrayref of coderefs or GLOB refs
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
 
-use Errno  qw(EAGAIN EINVAL);
+use Errno  qw(EAGAIN EINVAL EEXIST);
 use Carp   qw(croak confess carp);
-use File::Temp qw(tempfile);
+require File::Spec;
 
+my $nextq = []; # queue for next_tick
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -98,12 +100,6 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t
 sub AddTimer {
     my ($class, $secs, $coderef) = @_;
 
-    if (!$secs) {
-        my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
-        unshift(@Timers, $timer);
-        return $timer;
-    }
-
     my $fire_time = now() + $secs;
 
     my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
@@ -142,15 +138,17 @@ sub _InitPoller
     return if $DoneInit;
     $DoneInit = 1;
 
-    if (!PublicInbox::Syscall::epoll_defined())  {
-        $Epoll = eval {
-            require PublicInbox::DSKQXS;
-            PublicInbox::DSKQXS->import;
-            PublicInbox::DSKQXS->new;
-        };
-    } else {
+    if (PublicInbox::Syscall::epoll_defined())  {
         $Epoll = epoll_create();
         set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
+    } else {
+        my $cls;
+        for (qw(DSKQXS DSPoll)) {
+            $cls = "PublicInbox::$_";
+            last if eval "require $cls";
+        }
+        $cls->import(qw(epoll_ctl epoll_wait));
+        $Epoll = $cls->new;
     }
     *EventLoop = *EpollEventLoop;
 }
@@ -171,9 +169,23 @@ sub FirstTimeEventLoop {
 
 sub now () { clock_gettime(CLOCK_MONOTONIC) }
 
+sub next_tick () {
+    my $q = $nextq;
+    $nextq = [];
+    for (@$q) {
+        if (ref($_) eq 'CODE') {
+            $_->();
+        } else {
+            $_->event_step;
+        }
+    }
+}
+
 # runs timers and returns milliseconds for next one, or next event loop
 sub RunTimers {
-    return $LoopTimeout unless @Timers;
+    next_tick();
+
+    return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
 
     my $now = now();
 
@@ -183,6 +195,9 @@ sub RunTimers {
         $to_run->[1]->($now) if $to_run->[1];
     }
 
+    # timers may enqueue into nextq:
+    return 0 if (@$nextq || @ToClose);
+
     return $LoopTimeout unless @Timers;
 
     # convert time to an even number of milliseconds, adding 1
@@ -315,6 +330,8 @@ sub new {
 ### I N S T A N C E   M E T H O D S
 #####################################################################
 
+sub requeue ($) { push @$nextq, $_[0] }
+
 =head2 C<< $obj->close >>
 
 Close the socket.
@@ -410,16 +427,27 @@ next_buf:
     1; # all done
 }
 
-sub do_read ($$$$) {
+sub rbuf_idle ($$) {
+    my ($self, $rbuf) = @_;
+    if ($$rbuf eq '') { # who knows how long till we can read again
+        delete $self->{rbuf};
+    } else {
+        $self->{rbuf} = $rbuf;
+    }
+}
+
+sub do_read ($$$;$) {
     my ($self, $rbuf, $len, $off) = @_;
-    my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+    my $r = sysread($self->{sock}, $$rbuf, $len, $off // 0);
     return ($r == 0 ? $self->close : $r) if defined $r;
     # common for clients to break connections without warning,
     # would be too noisy to log here:
     if (ref($self) eq 'IO::Socket::SSL') {
         my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+        rbuf_idle($self, $rbuf);
         watch($self, $ev | EPOLLONESHOT);
     } elsif ($! == EAGAIN) {
+        rbuf_idle($self, $rbuf);
         watch($self, EPOLLIN | EPOLLONESHOT);
     } else {
         $self->close;
@@ -438,12 +466,16 @@ sub drop {
 # PerlIO::mmap or PerlIO::scalar if needed
 sub tmpio ($$$) {
     my ($self, $bref, $off) = @_;
-    # open(my $fh, '+>>', undef) doesn't set O_APPEND
-    my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) };
-    $fh or return drop($self, "tempfile: $@");
-    open($fh, '+>>', $path) or return drop($self, "open: $!");
+    my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND
+    do {
+        my $fn = File::Spec->tmpdir . '/wbuf-' . rand;
+        if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely
+            unlink($fn) or return drop($self, "unlink($fn) $!");
+        } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM
+            return drop($self, "open: $!");
+        }
+    } until (defined $fh);
     $fh->autoflush(1);
-    unlink($path) or return drop($self, "unlink: $!");
     my $len = bytes::length($$bref) - $off;
     $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
     $fh
@@ -499,7 +531,10 @@ sub write {
             return $self->close;
         }
         my $tmpio = tmpio($self, $bref, $written) or return 0;
-        $self->{wbuf} = [ $tmpio ];
+
+        # wbuf may be an empty array if we're being called inside
+        # ->flush_write via CODE bref:
+        push @{$self->{wbuf} ||= []}, $tmpio;
         watch($self, EPOLLOUT|EPOLLONESHOT);
         return 0;
     }
@@ -534,8 +569,6 @@ sub watch ($$) {
     0;
 }
 
-sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
-
 # return true if complete, false if incomplete (or failure)
 sub accept_tls_step ($) {
     my ($self) = @_;
@@ -572,7 +605,6 @@ sub shutdn ($) {
        $self->close;
     }
 }
-
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {