--- /dev/null
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# event cleanups (currently for Danga::Socket)
+package PublicInbox::EvCleanup;
+use strict;
+use warnings;
+
+my $asapq = { queue => [], timer => undef };
+my $laterq = { queue => [], timer => undef };
+
+sub _run_all ($) {
+ my ($q) = @_;
+
+ my $run = $q->{queue};
+ $q->{queue} = [];
+ $q->{timer} = undef;
+ $_->() foreach @$run;
+}
+
+sub _run_asap () { _run_all($asapq) }
+sub _run_later () { _run_all($laterq) }
+
+sub asap ($) {
+ my ($cb) = @_;
+ push @{$asapq->{queue}}, $cb;
+ $asapq->{timer} ||= Danga::Socket->AddTimer(0, *_run_asap);
+}
+
+sub later ($) {
+ my ($cb) = @_;
+ push @{$laterq->{queue}}, $cb;
+ $laterq->{timer} ||= Danga::Socket->AddTimer(60, *_run_later);
+}
+
+END {
+ _run_asap();
+ _run_later();
+}
+
+1;
# FIXME: duplicated code with NNTP.pm
my $WEAKEN = {}; # string(inbox) -> inbox
-my $WEAKTIMER;
+my $weakt;
sub weaken_task () {
- $WEAKTIMER = undef;
+ $weakt = undef;
$_->weaken_all for values %$WEAKEN;
$WEAKEN = {};
}
+my $pipelineq = [];
+my $pipet;
+sub process_pipelineq () {
+ my $q = $pipelineq;
+ $pipet = undef;
+ $pipelineq = [];
+ rbuf_process($_) foreach @$q;
+}
+
# Use the same configuration parameter as git since this is primarily
# a slow-client sponge for git-http-backend
# TODO: support per-respository http.maxRequestBuffer somehow...
if (my $obj = $env->{'pi-httpd.inbox'}) {
# grace period for reaping resources
$WEAKEN->{"$obj"} = $obj;
- $WEAKTIMER ||= Danga::Socket->AddTimer(60, *weaken_task);
+ $weakt ||= PublicInbox::EvCleanup::later(*weaken_task);
}
$self->{env} = undef;
};
$self->write($_[1]);
}
-my $pipelineq = [];
-my $next_tick;
-sub process_pipelineq () {
- $next_tick = undef;
- my $q = $pipelineq;
- $pipelineq = [];
- rbuf_process($_) foreach @$q;
-}
-
# overrides existing Danga::Socket method
sub event_write {
my ($self) = @_;
$self->watch_read(1);
} else { # avoid recursion for pipelined requests
push @$pipelineq, $self;
- $next_tick ||= Danga::Socket->AddTimer(0, *process_pipelineq);
+ $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
}
}
use warnings;
use base qw(Danga::Socket);
use fields qw(cb cleanup);
+require PublicInbox::EvCleanup;
sub new {
my ($class, $io, $cb, $cleanup) = @_;
$self->SUPER::close(@_);
# we defer this to the next timer loop since close is deferred
- Danga::Socket->AddTimer(0, $cleanup) if $cleanup;
+ PublicInbox::EvCleanup::asap($cleanup) if $cleanup;
}
# do not let ourselves be closed during graceful termination
use PublicInbox::Msgmap;
use PublicInbox::Git;
use PublicInbox::MID qw(mid2path);
+require PublicInbox::EvCleanup;
use Email::Simple;
use POSIX qw(strftime);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr);
my $EXPMAP; # fd -> [ idle_time, $self ]
-my $EXPTIMER;
+my $expt;
our $EXPTIME = 180; # 3 minutes
my $WEAKEN = {}; # string(nntpd) -> nntpd
-my $WEAKTIMER;
+my $weakt;
+my $nextt;
-my $next_tick;
my $nextq = [];
sub next_tick () {
- $next_tick = undef;
+ $nextt = undef;
my $q = $nextq;
$nextq = [];
foreach my $nntp (@$q) {
# reduce FD pressure by closing some "git cat-file --batch" processes
# and unused FDs for msgmap and Xapian indices
sub weaken_groups () {
- $WEAKTIMER = undef;
+ $weakt = undef;
foreach my $nntpd (values %$WEAKEN) {
$_->weaken_all foreach (@{$nntpd->{grouplist}});
}
my $now = now();
my $exp = $EXPTIME;
my $old = $now - $exp;
- my $next = $now + $exp;
my $nr = 0;
my %new;
while (my ($fd, $v) = each %$EXPMAP) {
if ($idle_time < $old) {
$nntp->close; # idempotent
} else {
- my $nexp = $idle_time + $exp;
- $next = $nexp if ($nexp < $next);
++$nr;
$new{$fd} = $v;
}
}
$EXPMAP = \%new;
if ($nr) {
- $next -= $now;
- $next = 0 if $next < 0;
- $EXPTIMER = Danga::Socket->AddTimer($next, *expire_old);
+ $expt = PublicInbox::EvCleanup::later(*expire_old);
weaken_groups();
} else {
- $EXPTIMER = undef;
+ $expt = undef;
# noop to kick outselves out of the loop ASAP so descriptors
# really get closed
- Danga::Socket->AddTimer(0, sub {});
+ PublicInbox::EvCleanup::asap(sub {});
# grace period for reaping resources
- $WEAKTIMER ||= Danga::Socket->AddTimer(30, *weaken_groups);
+ $weakt ||= PublicInbox::EvCleanup::later(*weaken_groups);
}
}
$self->watch_read(1);
update_idle_time($self);
$WEAKEN->{"$nntpd"} = $nntpd;
- $EXPTIMER ||= Danga::Socket->AddTimer($EXPTIME, *expire_old);
+ $expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
}
update_idle_time($self);
push @$nextq, $self;
- $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
+ $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
} else { # all done!
$self->{long_res} = undef;
$self->watch_read(1);
# in case we really did dispatch a read event and started
# another long response.
push @$nextq, $self;
- $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
+ $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
}
$rv;
}