]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
doc: technical/ds.txt: describe PublicInbox::DS divergences
[public-inbox.git] / lib / PublicInbox / DS.pm
index 30a9641ac6b08c3a051ec93af44300ddee959ac0..058b13589fc9585e868e2fcbd6f9a2552d2aafb4 100644 (file)
@@ -3,15 +3,15 @@
 #
 # This license differs from the rest of public-inbox
 #
-# This is a fork of the (for now) unmaintained Danga::Socket 1.61.
-# Unused features will be removed, and updates will be made to take
-# advantage of newer kernels.
+# This is a fork of the unmaintained Danga::Socket (1.61) with
+# significant changes.  See Documentation/technical/ds.txt in our
+# source for details.
 #
-# API changes to diverge from Danga::Socket will happen to better
-# accomodate new features and improve scalability.  Do not expect
-# this to be a stable API like Danga::Socket.
-# Bugs encountered (and likely fixed) are reported to
-# bug-Danga-Socket@rt.cpan.org and visible at:
+# Do not expect this to be a stable API like Danga::Socket,
+# but it will evolve to suite our needs and to take advantage of
+# newer Linux and *BSD features.
+# Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org,
+# fixed in Danga::Socket 1.62 and visible at:
 # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket
 package PublicInbox::DS;
 use strict;
@@ -24,6 +24,7 @@ use parent qw(Exporter);
 our @EXPORT_OK = qw(now msg_more);
 use warnings;
 use 5.010_001;
+use Scalar::Util qw(blessed);
 
 use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
@@ -34,13 +35,15 @@ use fields ('sock',              # underlying socket
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
 
-use Errno  qw(EAGAIN EINVAL);
-use Carp   qw(croak confess carp);
-require File::Spec;
+use Errno qw(EAGAIN EINVAL);
+use Carp qw(confess carp);
 
-my $nextq = []; # queue for next_tick
-my $WaitPids = [];               # list of [ pid, callback, callback_arg ]
-my $reap_timer;
+my $nextq; # queue for next_tick
+my $WaitPids; # list of [ pid, callback, callback_arg ]
+my $later_queue; # callbacks
+my $EXPMAP; # fd -> [ idle_time, $self ]
+our $EXPTIME = 180; # 3 minutes
+my ($later_timer, $reap_timer, $exp_timer);
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -52,6 +55,7 @@ our (
      $LoopTimeout,               # timeout of event loop in milliseconds
      $DoneInit,                  # if we've done the one-time module init yet
      @Timers,                    # timers
+     $in_loop,
      );
 
 Reset();
@@ -67,8 +71,11 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
+    $nextq = [];
     $WaitPids = [];
-    $reap_timer = undef;
+    $later_queue = [];
+    $EXPMAP = {};
+    $reap_timer = $later_timer = $exp_timer = undef;
     @ToClose = ();
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
@@ -178,10 +185,12 @@ sub next_tick () {
     my $q = $nextq;
     $nextq = [];
     for (@$q) {
-        if (ref($_) eq 'CODE') {
-            $_->();
-        } else {
+        # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak:
+        # https://rt.perl.org/Public/Bug/Display.html?id=114340
+        if (blessed($_)) {
             $_->event_step;
+        } else {
+            $_->();
         }
     }
 }
@@ -246,8 +255,8 @@ sub reap_pids {
 sub enqueue_reap ($) { push @$nextq, \&reap_pids };
 
 sub EpollEventLoop {
-    local $SIG{CHLD} = \&enqueue_reap;
-    while (1) {
+    local $in_loop = 1;
+    do {
         my @events;
         my $i;
         my $timeout = RunTimers();
@@ -261,8 +270,8 @@ sub EpollEventLoop {
             # in that event.
             $DescriptorMap{$events[$i]->[0]}->event_step;
         }
-        return unless PostEventLoop();
-    }
+    } while (PostEventLoop());
+    _run_later();
 }
 
 =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
@@ -564,15 +573,18 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
 sub msg_more ($$) {
     my $self = $_[0];
     my $sock = $self->{sock} or return 1;
+    my $wbuf = $self->{wbuf};
 
-    if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') {
+    if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
+               ref($sock) ne 'IO::Socket::SSL') {
         my $n = send($sock, $_[1], MSG_MORE);
         if (defined $n) {
             my $nlen = bytes::length($_[1]) - $n;
             return 1 if $nlen == 0; # all done!
             # queue up the unwritten substring:
             my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
-            $self->{wbuf} = [ $tmpio ];
+            $self->{wbuf} //= $wbuf //= [];
+            push @$wbuf, $tmpio;
             epwait($sock, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
@@ -625,8 +637,7 @@ sub shutdn ($) {
 # must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
 sub dwaitpid ($$$) {
     my ($pid, $cb, $arg) = @_;
-    my $chld = $SIG{CHLD};
-    if (defined($chld) && $chld eq \&enqueue_reap) {
+    if ($in_loop) {
         push @$WaitPids, [ $pid, $cb, $arg ];
 
         # We could've just missed our SIGCHLD, cover it, here:
@@ -636,6 +647,53 @@ sub dwaitpid ($$$) {
     }
 }
 
+sub _run_later () {
+    my $run = $later_queue;
+    $later_timer = undef;
+    $later_queue = [];
+    $_->() for @$run;
+}
+
+sub later ($) {
+    my ($cb) = @_;
+    push @$later_queue, $cb;
+    $later_timer //= AddTimer(undef, 60, \&_run_later);
+}
+
+sub expire_old () {
+    my $now = now();
+    my $exp = $EXPTIME;
+    my $old = $now - $exp;
+    my %new;
+    while (my ($fd, $v) = each %$EXPMAP) {
+        my ($idle_time, $ds_obj) = @$v;
+        if ($idle_time < $old) {
+            if (!$ds_obj->shutdn) {
+                $new{$fd} = $v;
+            }
+        } else {
+            $new{$fd} = $v;
+        }
+    }
+    $EXPMAP = \%new;
+    $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef;
+}
+
+sub update_idle_time {
+    my ($self) = @_;
+    my $sock = $self->{sock} or return;
+    $EXPMAP->{fileno($sock)} = [ now(), $self ];
+    $exp_timer //= later(\&expire_old);
+}
+
+sub not_idle_long {
+    my ($self, $now) = @_;
+    my $sock = $self->{sock} or return;
+    my $ary = $EXPMAP->{fileno($sock)} or return;
+    my $exp_at = $ary->[0] + $EXPTIME;
+    $exp_at > $now;
+}
+
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {