]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
treewide: "require" + "use" cleanup and docs
[public-inbox.git] / lib / PublicInbox / DS.pm
index 17c640f4c1aac251460e09309dd320fb1bffd1fa..09dc399250a30382d73592b8015e5ae19a9fabb3 100644 (file)
@@ -35,13 +35,15 @@ use fields ('sock',              # underlying socket
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
 
-use Errno  qw(EAGAIN EINVAL);
-use Carp   qw(croak confess carp);
-require File::Spec;
-
-my $nextq = []; # queue for next_tick
-my $WaitPids = [];               # list of [ pid, callback, callback_arg ]
-my $reap_timer;
+use Errno qw(EAGAIN EINVAL);
+use Carp qw(confess carp);
+
+my $nextq; # queue for next_tick
+my $WaitPids; # list of [ pid, callback, callback_arg ]
+my $later_queue; # callbacks
+my $EXPMAP; # fd -> [ idle_time, $self ]
+our $EXPTIME = 180; # 3 minutes
+my ($later_timer, $reap_timer, $exp_timer);
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -69,8 +71,11 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
+    $nextq = [];
     $WaitPids = [];
-    $reap_timer = undef;
+    $later_queue = [];
+    $EXPMAP = {};
+    $reap_timer = $later_timer = $exp_timer = undef;
     @ToClose = ();
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
@@ -251,7 +256,7 @@ sub enqueue_reap ($) { push @$nextq, \&reap_pids };
 
 sub EpollEventLoop {
     local $in_loop = 1;
-    while (1) {
+    do {
         my @events;
         my $i;
         my $timeout = RunTimers();
@@ -265,8 +270,8 @@ sub EpollEventLoop {
             # in that event.
             $DescriptorMap{$events[$i]->[0]}->event_step;
         }
-        return unless PostEventLoop();
-    }
+    } while (PostEventLoop());
+    _run_later();
 }
 
 =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
@@ -568,15 +573,18 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
 sub msg_more ($$) {
     my $self = $_[0];
     my $sock = $self->{sock} or return 1;
+    my $wbuf = $self->{wbuf};
 
-    if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') {
+    if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
+               ref($sock) ne 'IO::Socket::SSL') {
         my $n = send($sock, $_[1], MSG_MORE);
         if (defined $n) {
             my $nlen = bytes::length($_[1]) - $n;
             return 1 if $nlen == 0; # all done!
             # queue up the unwritten substring:
             my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
-            $self->{wbuf} = [ $tmpio ];
+            $self->{wbuf} //= $wbuf //= [];
+            push @$wbuf, $tmpio;
             epwait($sock, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
@@ -639,6 +647,53 @@ sub dwaitpid ($$$) {
     }
 }
 
+sub _run_later () {
+    my $run = $later_queue;
+    $later_timer = undef;
+    $later_queue = [];
+    $_->() for @$run;
+}
+
+sub later ($) {
+    my ($cb) = @_;
+    push @$later_queue, $cb;
+    $later_timer //= AddTimer(undef, 60, \&_run_later);
+}
+
+sub expire_old () {
+    my $now = now();
+    my $exp = $EXPTIME;
+    my $old = $now - $exp;
+    my %new;
+    while (my ($fd, $v) = each %$EXPMAP) {
+        my ($idle_time, $ds_obj) = @$v;
+        if ($idle_time < $old) {
+            if (!$ds_obj->shutdn) {
+                $new{$fd} = $v;
+            }
+        } else {
+            $new{$fd} = $v;
+        }
+    }
+    $EXPMAP = \%new;
+    $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef;
+}
+
+sub update_idle_time {
+    my ($self) = @_;
+    my $sock = $self->{sock} or return;
+    $EXPMAP->{fileno($sock)} = [ now(), $self ];
+    $exp_timer //= later(\&expire_old);
+}
+
+sub not_idle_long {
+    my ($self, $now) = @_;
+    my $sock = $self->{sock} or return;
+    my $ary = $EXPMAP->{fileno($sock)} or return;
+    my $exp_at = $ary->[0] + $EXPTIME;
+    $exp_at > $now;
+}
+
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {