]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
ds: guard ToClose against DESTROY side-effects
[public-inbox.git] / lib / PublicInbox / DS.pm
index 8f17f7fb3d3b0db79ae5ebc93ba6aa9fb0e2daa5..d0aefec0f70bff09789ce645b8f4b83fd03d6a28 100644 (file)
@@ -3,15 +3,15 @@
 #
 # This license differs from the rest of public-inbox
 #
-# This is a fork of the (for now) unmaintained Danga::Socket 1.61.
-# Unused features will be removed, and updates will be made to take
-# advantage of newer kernels.
+# This is a fork of the unmaintained Danga::Socket (1.61) with
+# significant changes.  See Documentation/technical/ds.txt in our
+# source for details.
 #
-# API changes to diverge from Danga::Socket will happen to better
-# accomodate new features and improve scalability.  Do not expect
-# this to be a stable API like Danga::Socket.
-# Bugs encountered (and likely fixed) are reported to
-# bug-Danga-Socket@rt.cpan.org and visible at:
+# Do not expect this to be a stable API like Danga::Socket,
+# but it will evolve to suite our needs and to take advantage of
+# newer Linux and *BSD features.
+# Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org,
+# fixed in Danga::Socket 1.62 and visible at:
 # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket
 package PublicInbox::DS;
 use strict;
@@ -35,9 +35,8 @@ use fields ('sock',              # underlying socket
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
 
-use Errno  qw(EAGAIN EINVAL);
-use Carp   qw(croak confess carp);
-require File::Spec;
+use Errno qw(EAGAIN EINVAL);
+use Carp qw(confess carp);
 
 my $nextq; # queue for next_tick
 my $WaitPids; # list of [ pid, callback, callback_arg ]
@@ -45,11 +44,11 @@ my $later_queue; # callbacks
 my $EXPMAP; # fd -> [ idle_time, $self ]
 our $EXPTIME = 180; # 3 minutes
 my ($later_timer, $reap_timer, $exp_timer);
+my $ToClose; # sockets to close when event loop is done
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
      $_io,                       # IO::Handle for Epoll
-     @ToClose,                   # sockets to close when event loop is done
 
      $PostLoopCallback,          # subref to call at the end of each loop, if defined (global)
 
@@ -76,8 +75,7 @@ sub Reset {
     $WaitPids = [];
     $later_queue = [];
     $EXPMAP = {};
-    $reap_timer = $later_timer = $exp_timer = undef;
-    @ToClose = ();
+    $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
 
@@ -102,20 +100,18 @@ sub SetLoopTimeout {
     return $LoopTimeout = $_[1] + 0;
 }
 
-=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
+=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >>
 
 Add a timer to occur $seconds from now. $seconds may be fractional, but timers
 are not guaranteed to fire at the exact time you ask for.
 
-Returns a timer object which you can call C<< $timer->cancel >> on if you need to.
-
 =cut
-sub AddTimer {
-    my ($class, $secs, $coderef) = @_;
+sub add_timer ($$) {
+    my ($secs, $coderef) = @_;
 
     my $fire_time = now() + $secs;
 
-    my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
+    my $timer = [$fire_time, $coderef];
 
     if (!@Timers || $fire_time >= $Timers[-1][0]) {
         push @Timers, $timer;
@@ -200,7 +196,7 @@ sub next_tick () {
 sub RunTimers {
     next_tick();
 
-    return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
+    return ((@$nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers;
 
     my $now = now();
 
@@ -211,7 +207,7 @@ sub RunTimers {
     }
 
     # timers may enqueue into nextq:
-    return 0 if (@$nextq || @ToClose);
+    return 0 if (@$nextq || $ToClose);
 
     return $LoopTimeout unless @Timers;
 
@@ -248,13 +244,34 @@ sub reap_pids {
     }
     if (@$WaitPids) {
         # we may not be donea, and we may miss our
-        $reap_timer = AddTimer(undef, 1, \&reap_pids);
+        $reap_timer = add_timer(1, \&reap_pids);
     }
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)
 sub enqueue_reap ($) { push @$nextq, \&reap_pids };
 
+sub in_loop () { $in_loop }
+
+# Internal function: run the post-event callback, send read events
+# for pushed-back data, and close pending connections.  returns 1
+# if event loop should continue, or 0 to shut it all down.
+sub PostEventLoop () {
+       # now we can close sockets that wanted to close during our event
+       # processing.  (we didn't want to close them during the loop, as we
+       # didn't want fd numbers being reused and confused during the event
+       # loop)
+       if (my $close_now = $ToClose) {
+               $ToClose = undef; # will be autovivified on push
+               # ->DESTROY methods may populate ToClose
+               delete($DescriptorMap{fileno($_)}) for @$close_now;
+               # let refcounting drop everything in $close_now at once
+       }
+
+       # by default we keep running, unless a postloop callback cancels it
+       $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1;
+}
+
 sub EpollEventLoop {
     local $in_loop = 1;
     do {
@@ -293,28 +310,6 @@ sub SetPostLoopCallback {
     $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
 }
 
-# Internal function: run the post-event callback, send read events
-# for pushed-back data, and close pending connections.  returns 1
-# if event loop should continue, or 0 to shut it all down.
-sub PostEventLoop {
-    # now we can close sockets that wanted to close during our event processing.
-    # (we didn't want to close them during the loop, as we didn't want fd numbers
-    #  being reused and confused during the event loop)
-    delete($DescriptorMap{fileno($_)}) for @ToClose;
-    @ToClose = (); # let refcounting drop everything all at once
-
-    # by default we keep running, unless a postloop callback (either per-object
-    # or global) cancels it
-    my $keep_running = 1;
-
-    # now we're at the very end, call callback if defined
-    if (defined $PostLoopCallback) {
-        $keep_running &&= $PostLoopCallback->(\%DescriptorMap);
-    }
-
-    return $keep_running;
-}
-
 #####################################################################
 ### PublicInbox::DS-the-object code
 #####################################################################
@@ -338,9 +333,6 @@ sub new {
     $self->{sock} = $sock;
     my $fd = fileno($sock);
 
-    Carp::cluck("undef sock and/or fd in PublicInbox::DS->new.  sock=" . ($sock || "") . ", fd=" . ($fd || ""))
-        unless $sock && $fd;
-
     _InitPoller();
 
     if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
@@ -350,11 +342,10 @@ sub new {
         }
         die "couldn't add epoll watch for $fd: $!\n";
     }
-    Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
-        if $DescriptorMap{$fd};
+    confess("DescriptorMap{$fd} defined ($DescriptorMap{$fd})")
+        if defined($DescriptorMap{$fd});
 
     $DescriptorMap{$fd} = $self;
-    return $self;
 }
 
 
@@ -395,7 +386,7 @@ sub close {
 
     # defer closing the actual socket until the event loop is done
     # processing this round of events.  (otherwise we might reuse fds)
-    push @ToClose, $sock;
+    push @$ToClose, $sock; # autovivifies $ToClose
 
     return 0;
 }
@@ -658,7 +649,7 @@ sub _run_later () {
 sub later ($) {
     my ($cb) = @_;
     push @$later_queue, $cb;
-    $later_timer //= AddTimer(undef, 60, \&_run_later);
+    $later_timer //= add_timer(60, \&_run_later);
 }
 
 sub expire_old () {
@@ -695,12 +686,6 @@ sub not_idle_long {
     $exp_at > $now;
 }
 
-package PublicInbox::DS::Timer;
-# [$abs_float_firetime, $coderef];
-sub cancel {
-    $_[0][1] = undef;
-}
-
 1;
 
 =head1 AUTHORS (Danga::Socket)