]> Sergey Matveev's repositories - public-inbox.git/commitdiff
standardize timer-related event-loop code
authorEric Wong <e@80x24.org>
Tue, 24 May 2016 03:41:51 +0000 (03:41 +0000)
committerEric Wong <e@80x24.org>
Tue, 24 May 2016 04:12:03 +0000 (04:12 +0000)
Standardize the code we have in place to avoid creating too many
timer objects.  We do not need exact timers for things that don't
need to be run ASAP, so we can play things fast and loose to avoid
wasting power with unnecessary wakeups.

We only need two classes of timers:

* asap - run this on the next loop tick, after operating on
  @Danga::Socket::ToClose to close remaining sockets

* later - run at some point in the future.  It could be as
  soon as immediately (like "asap"), and as late as 60s into
  the future.

In the future, we support an "emergency" switch to fire "later"
timers immediately.

lib/PublicInbox/EvCleanup.pm [new file with mode: 0644]
lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD/Async.pm
lib/PublicInbox/NNTP.pm

diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
new file mode 100644 (file)
index 0000000..5efb093
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# event cleanups (currently for Danga::Socket)
+package PublicInbox::EvCleanup;
+use strict;
+use warnings;
+
+my $asapq = { queue => [], timer => undef };
+my $laterq = { queue => [], timer => undef };
+
+sub _run_all ($) {
+       my ($q) = @_;
+
+       my $run = $q->{queue};
+       $q->{queue} = [];
+       $q->{timer} = undef;
+       $_->() foreach @$run;
+}
+
+sub _run_asap () { _run_all($asapq) }
+sub _run_later () { _run_all($laterq) }
+
+sub asap ($) {
+       my ($cb) = @_;
+       push @{$asapq->{queue}}, $cb;
+       $asapq->{timer} ||= Danga::Socket->AddTimer(0, *_run_asap);
+}
+
+sub later ($) {
+       my ($cb) = @_;
+       push @{$laterq->{queue}}, $cb;
+       $laterq->{timer} ||= Danga::Socket->AddTimer(60, *_run_later);
+}
+
+END {
+       _run_asap();
+       _run_later();
+}
+
+1;
index 104a2132ba6108b92b9a5764d46a2b820234a7ee..00c9a0444bf75e63689d60893d6b3e236c1ca264 100644 (file)
@@ -26,13 +26,22 @@ use constant {
 
 # FIXME: duplicated code with NNTP.pm
 my $WEAKEN = {}; # string(inbox) -> inbox
-my $WEAKTIMER;
+my $weakt;
 sub weaken_task () {
-       $WEAKTIMER = undef;
+       $weakt = undef;
        $_->weaken_all for values %$WEAKEN;
        $WEAKEN = {};
 }
 
+my $pipelineq = [];
+my $pipet;
+sub process_pipelineq () {
+       my $q = $pipelineq;
+       $pipet = undef;
+       $pipelineq = [];
+       rbuf_process($_) foreach @$q;
+}
+
 # Use the same configuration parameter as git since this is primarily
 # a slow-client sponge for git-http-backend
 # TODO: support per-respository http.maxRequestBuffer somehow...
@@ -234,7 +243,7 @@ sub response_write {
                if (my $obj = $env->{'pi-httpd.inbox'}) {
                        # grace period for reaping resources
                        $WEAKEN->{"$obj"} = $obj;
-                       $WEAKTIMER ||= Danga::Socket->AddTimer(60, *weaken_task);
+                       $weakt ||= PublicInbox::EvCleanup::later(*weaken_task);
                }
                $self->{env} = undef;
        };
@@ -281,15 +290,6 @@ sub more ($$) {
        $self->write($_[1]);
 }
 
-my $pipelineq = [];
-my $next_tick;
-sub process_pipelineq () {
-       $next_tick = undef;
-       my $q = $pipelineq;
-       $pipelineq = [];
-       rbuf_process($_) foreach @$q;
-}
-
 # overrides existing Danga::Socket method
 sub event_write {
        my ($self) = @_;
@@ -300,7 +300,7 @@ sub event_write {
                $self->watch_read(1);
        } else { # avoid recursion for pipelined requests
                push @$pipelineq, $self;
-               $next_tick ||= Danga::Socket->AddTimer(0, *process_pipelineq);
+               $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
        }
 }
 
index bd2eacbfdfee8a5be8f3e9b423907a9ac16b5851..47ba27d2e42979a2594aa23829ee0e02582be092 100644 (file)
@@ -10,6 +10,7 @@ use strict;
 use warnings;
 use base qw(Danga::Socket);
 use fields qw(cb cleanup);
+require PublicInbox::EvCleanup;
 
 sub new {
        my ($class, $io, $cb, $cleanup) = @_;
@@ -61,7 +62,7 @@ sub close {
        $self->SUPER::close(@_);
 
        # we defer this to the next timer loop since close is deferred
-       Danga::Socket->AddTimer(0, $cleanup) if $cleanup;
+       PublicInbox::EvCleanup::asap($cleanup) if $cleanup;
 }
 
 # do not let ourselves be closed during graceful termination
index ac536f71fb703d882fb9466760652df46e51f9b8..f3de4b1cd5722b603f5aa2f1d016ee8e7fb8572c 100644 (file)
@@ -11,6 +11,7 @@ use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::Git;
 use PublicInbox::MID qw(mid2path);
+require PublicInbox::EvCleanup;
 use Email::Simple;
 use POSIX qw(strftime);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
@@ -38,15 +39,15 @@ my $LIST_HEADERS = join("\r\n", @OVERVIEW,
 my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr);
 
 my $EXPMAP; # fd -> [ idle_time, $self ]
-my $EXPTIMER;
+my $expt;
 our $EXPTIME = 180; # 3 minutes
 my $WEAKEN = {}; # string(nntpd) -> nntpd
-my $WEAKTIMER;
+my $weakt;
+my $nextt;
 
-my $next_tick;
 my $nextq = [];
 sub next_tick () {
-       $next_tick = undef;
+       $nextt = undef;
        my $q = $nextq;
        $nextq = [];
        foreach my $nntp (@$q) {
@@ -70,7 +71,7 @@ sub update_idle_time ($) {
 # reduce FD pressure by closing some "git cat-file --batch" processes
 # and unused FDs for msgmap and Xapian indices
 sub weaken_groups () {
-       $WEAKTIMER = undef;
+       $weakt = undef;
        foreach my $nntpd (values %$WEAKEN) {
                $_->weaken_all foreach (@{$nntpd->{grouplist}});
        }
@@ -81,7 +82,6 @@ sub expire_old () {
        my $now = now();
        my $exp = $EXPTIME;
        my $old = $now - $exp;
-       my $next = $now + $exp;
        my $nr = 0;
        my %new;
        while (my ($fd, $v) = each %$EXPMAP) {
@@ -89,26 +89,22 @@ sub expire_old () {
                if ($idle_time < $old) {
                        $nntp->close; # idempotent
                } else {
-                       my $nexp = $idle_time + $exp;
-                       $next = $nexp if ($nexp < $next);
                        ++$nr;
                        $new{$fd} = $v;
                }
        }
        $EXPMAP = \%new;
        if ($nr) {
-               $next -= $now;
-               $next = 0 if $next < 0;
-               $EXPTIMER = Danga::Socket->AddTimer($next, *expire_old);
+               $expt = PublicInbox::EvCleanup::later(*expire_old);
                weaken_groups();
        } else {
-               $EXPTIMER = undef;
+               $expt = undef;
                # noop to kick outselves out of the loop ASAP so descriptors
                # really get closed
-               Danga::Socket->AddTimer(0, sub {});
+               PublicInbox::EvCleanup::asap(sub {});
 
                # grace period for reaping resources
-               $WEAKTIMER ||= Danga::Socket->AddTimer(30, *weaken_groups);
+               $weakt ||= PublicInbox::EvCleanup::later(*weaken_groups);
        }
 }
 
@@ -122,7 +118,7 @@ sub new ($$$) {
        $self->watch_read(1);
        update_idle_time($self);
        $WEAKEN->{"$nntpd"} = $nntpd;
-       $EXPTIMER ||= Danga::Socket->AddTimer($EXPTIME, *expire_old);
+       $expt ||= PublicInbox::EvCleanup::later(*expire_old);
        $self;
 }
 
@@ -633,7 +629,7 @@ sub long_response ($$$$) {
                        update_idle_time($self);
 
                        push @$nextq, $self;
-                       $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
+                       $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
                } else { # all done!
                        $self->{long_res} = undef;
                        $self->watch_read(1);
@@ -996,7 +992,7 @@ sub watch_read {
                # in case we really did dispatch a read event and started
                # another long response.
                push @$nextq, $self;
-               $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
+               $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
        }
        $rv;
 }