]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
ds: rely on autovivification for $later_queue
[public-inbox.git] / lib / PublicInbox / DS.pm
index cea25d90e37516c354e94410148f8299fa64cb15..673c24eb86a81315eccd8b41457385d51072aba1 100644 (file)
@@ -31,7 +31,7 @@ use PublicInbox::Tmpfile;
 
 use fields ('sock',              # underlying socket
             'rbuf',              # scalarref, usually undef
-            'wbuf',              # arrayref of coderefs or GLOB refs
+            'wbuf', # arrayref of coderefs or GLOB refs (autovivified)
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
 
@@ -39,16 +39,16 @@ use Errno qw(EAGAIN EINVAL);
 use Carp qw(confess carp);
 
 my $nextq; # queue for next_tick
-my $WaitPids; # list of [ pid, callback, callback_arg ]
-my $later_queue; # callbacks
+my $wait_pids; # list of [ pid, callback, callback_arg ]
+my $later_queue; # list of callbacks to run at some later interval
 my $EXPMAP; # fd -> [ idle_time, $self ]
 our $EXPTIME = 180; # 3 minutes
 my ($later_timer, $reap_timer, $exp_timer);
+my $ToClose; # sockets to close when event loop is done
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
      $_io,                       # IO::Handle for Epoll
-     @ToClose,                   # sockets to close when event loop is done
 
      $PostLoopCallback,          # subref to call at the end of each loop, if defined (global)
 
@@ -71,12 +71,9 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
-    $nextq = [];
-    $WaitPids = [];
-    $later_queue = [];
+    $wait_pids = $later_queue = undef;
     $EXPMAP = {};
-    $reap_timer = $later_timer = $exp_timer = undef;
-    @ToClose = ();
+    $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
 
@@ -180,8 +177,8 @@ sub FirstTimeEventLoop {
 sub now () { clock_gettime(CLOCK_MONOTONIC) }
 
 sub next_tick () {
-    my $q = $nextq;
-    $nextq = [];
+    my $q = $nextq or return;
+    $nextq = undef;
     for (@$q) {
         # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak:
         # https://rt.perl.org/Public/Bug/Display.html?id=114340
@@ -197,7 +194,7 @@ sub next_tick () {
 sub RunTimers {
     next_tick();
 
-    return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
+    return (($nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers;
 
     my $now = now();
 
@@ -208,7 +205,7 @@ sub RunTimers {
     }
 
     # timers may enqueue into nextq:
-    return 0 if (@$nextq || @ToClose);
+    return 0 if ($nextq || $ToClose);
 
     return $LoopTimeout unless @Timers;
 
@@ -228,32 +225,49 @@ sub RunTimers {
 }
 
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
-# and other things.  So we scan the $WaitPids list, which is hopefully
-# not too big.
+# and other things.  So we scan the $wait_pids list, which is hopefully
+# not too big.  We keep $wait_pids small by not calling dwaitpid()
+# until we've hit EOF when reading the stdout of the child.
 sub reap_pids {
-    my $tmp = $WaitPids;
-    $WaitPids = [];
-    $reap_timer = undef;
+    my $tmp = $wait_pids or return;
+    $wait_pids = $reap_timer = undef;
     foreach my $ary (@$tmp) {
         my ($pid, $cb, $arg) = @$ary;
         my $ret = waitpid($pid, WNOHANG);
         if ($ret == 0) {
-            push @$WaitPids, $ary;
+            push @$wait_pids, $ary; # autovivifies @$wait_pids
         } elsif ($cb) {
             eval { $cb->($arg, $pid) };
         }
     }
-    if (@$WaitPids) {
-        # we may not be donea, and we may miss our
-        $reap_timer = add_timer(1, \&reap_pids);
-    }
+    # we may not be done, yet, and could've missed/masked a SIGCHLD:
+    $reap_timer = add_timer(1, \&reap_pids) if $wait_pids;
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)
-sub enqueue_reap ($) { push @$nextq, \&reap_pids };
+sub enqueue_reap ($) { push @$nextq, \&reap_pids }; # autovivifies
 
 sub in_loop () { $in_loop }
 
+# Internal function: run the post-event callback, send read events
+# for pushed-back data, and close pending connections.  returns 1
+# if event loop should continue, or 0 to shut it all down.
+sub PostEventLoop () {
+       # now we can close sockets that wanted to close during our event
+       # processing.  (we didn't want to close them during the loop, as we
+       # didn't want fd numbers being reused and confused during the event
+       # loop)
+       if (my $close_now = $ToClose) {
+               $ToClose = undef; # will be autovivified on push
+               # ->DESTROY methods may populate ToClose
+               delete($DescriptorMap{fileno($_)}) for @$close_now;
+               # let refcounting drop everything in $close_now at once
+       }
+
+       # by default we keep running, unless a postloop callback cancels it
+       $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1;
+}
+
 sub EpollEventLoop {
     local $in_loop = 1;
     do {
@@ -292,28 +306,6 @@ sub SetPostLoopCallback {
     $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
 }
 
-# Internal function: run the post-event callback, send read events
-# for pushed-back data, and close pending connections.  returns 1
-# if event loop should continue, or 0 to shut it all down.
-sub PostEventLoop {
-    # now we can close sockets that wanted to close during our event processing.
-    # (we didn't want to close them during the loop, as we didn't want fd numbers
-    #  being reused and confused during the event loop)
-    delete($DescriptorMap{fileno($_)}) for @ToClose;
-    @ToClose = (); # let refcounting drop everything all at once
-
-    # by default we keep running, unless a postloop callback (either per-object
-    # or global) cancels it
-    my $keep_running = 1;
-
-    # now we're at the very end, call callback if defined
-    if (defined $PostLoopCallback) {
-        $keep_running &&= $PostLoopCallback->(\%DescriptorMap);
-    }
-
-    return $keep_running;
-}
-
 #####################################################################
 ### PublicInbox::DS-the-object code
 #####################################################################
@@ -357,7 +349,7 @@ sub new {
 ### I N S T A N C E   M E T H O D S
 #####################################################################
 
-sub requeue ($) { push @$nextq, $_[0] }
+sub requeue ($) { push @$nextq, $_[0] } # autovivifies
 
 =head2 C<< $obj->close >>
 
@@ -390,7 +382,7 @@ sub close {
 
     # defer closing the actual socket until the event loop is done
     # processing this round of events.  (otherwise we might reuse fds)
-    push @ToClose, $sock;
+    push @$ToClose, $sock; # autovivifies $ToClose
 
     return 0;
 }
@@ -559,7 +551,7 @@ sub write {
 
         # wbuf may be an empty array if we're being called inside
         # ->flush_write via CODE bref:
-        push @{$self->{wbuf} ||= []}, $tmpio;
+        push @{$self->{wbuf}}, $tmpio; # autovivifies
         return 0;
     }
 }
@@ -579,8 +571,7 @@ sub msg_more ($$) {
             return 1 if $nlen == 0; # all done!
             # queue up the unwritten substring:
             my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
-            $self->{wbuf} //= $wbuf //= [];
-            push @$wbuf, $tmpio;
+            push @{$self->{wbuf}}, $tmpio; # autovivifies
             epwait($sock, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
@@ -603,7 +594,7 @@ sub accept_tls_step ($) {
     return 1 if $sock->accept_SSL;
     return $self->close if $! != EAGAIN;
     epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
-    unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+    unshift(@{$self->{wbuf}}, \&accept_tls_step); # autovivifies
     0;
 }
 
@@ -614,7 +605,7 @@ sub shutdn_tls_step ($) {
     return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
     return $self->close if $! != EAGAIN;
     epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
-    unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+    unshift(@{$self->{wbuf}}, \&shutdn_tls_step); # autovivifies
     0;
 }
 
@@ -632,28 +623,22 @@ sub shutdn ($) {
 
 # must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
 sub dwaitpid ($$$) {
-    my ($pid, $cb, $arg) = @_;
-    if ($in_loop) {
-        push @$WaitPids, [ $pid, $cb, $arg ];
+       die "Not in EventLoop\n" unless $in_loop;
+       push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ]
 
-        # We could've just missed our SIGCHLD, cover it, here:
-        requeue(\&reap_pids);
-    } else {
-        die "Not in EventLoop\n";
-    }
+       # We could've just missed our SIGCHLD, cover it, here:
+       requeue(\&reap_pids);
 }
 
 sub _run_later () {
-    my $run = $later_queue;
-    $later_timer = undef;
-    $later_queue = [];
-    $_->() for @$run;
+       my $run = $later_queue or return;
+       $later_timer = $later_queue = undef;
+       $_->() for @$run;
 }
 
 sub later ($) {
-    my ($cb) = @_;
-    push @$later_queue, $cb;
-    $later_timer //= add_timer(60, \&_run_later);
+       push @$later_queue, $_[0]; # autovivifies @$later_queue
+       $later_timer //= add_timer(60, \&_run_later);
 }
 
 sub expire_old () {