use strict;
use v5.10.1;
use parent qw(Exporter);
-use bytes;
+use bytes qw(length substr); # FIXME(?): needed for PublicInbox::NNTP
use POSIX qw(WNOHANG sigprocmask SIG_SETMASK);
use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use PublicInbox::Tmpfile;
use Errno qw(EAGAIN EINVAL);
use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid add_timer);
+our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer);
my %Stack;
my $nextq; # queue for next_tick
my $later_q; # list of callbacks to run at some later interval
my $EXPMAP; # fd -> idle_time
our $EXPTIME = 180; # 3 minutes
-my ($later_timer, $reap_armed, $exp_timer);
+my ($reap_armed);
my $ToClose; # sockets to close when event loop is done
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
$LoopTimeout, # timeout of event loop in milliseconds
@Timers, # timers
+ %UniqTimer,
$in_loop,
);
$in_loop = undef; # first in case DESTROY callbacks use this
%DescriptorMap = ();
@Timers = ();
+ %UniqTimer = ();
$PostLoopCallback = undef;
# we may be iterating inside one of these on our stack
my @q = delete @Stack{keys %Stack};
for my $q (@q) { @$q = () }
- $EXPMAP = {};
+ $EXPMAP = undef;
$wait_pids = $later_q = $nextq = $ToClose = undef;
$_io = undef; # closes real $Epoll FD
$Epoll = undef; # may call DSKQXS::DESTROY
} while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
$later_q || $ToClose || keys(%DescriptorMap) ||
- $PostLoopCallback);
+ $PostLoopCallback || keys(%UniqTimer));
- $reap_armed = $later_timer = $exp_timer = undef;
+ $reap_armed = undef;
$LoopTimeout = -1; # no timeout by default
}
=cut
sub SetLoopTimeout { $LoopTimeout = $_[1] + 0 }
-=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >>
-
-Add a timer to occur $seconds from now. $seconds may be fractional, but timers
-are not guaranteed to fire at the exact time you ask for.
-
-=cut
-sub add_timer ($$;@) {
- my ($secs, $coderef, @args) = @_;
-
- my $fire_time = now() + $secs;
+sub _add_named_timer {
+ my ($name, $secs, $coderef, @args) = @_;
+ my $fire_time = now() + $secs;
+ my $timer = [$fire_time, $name, $coderef, @args];
- my $timer = [$fire_time, $coderef, @args];
+ if (!@Timers || $fire_time >= $Timers[-1][0]) {
+ push @Timers, $timer;
+ return $timer;
+ }
- if (!@Timers || $fire_time >= $Timers[-1][0]) {
- push @Timers, $timer;
- return $timer;
- }
+ # Now, where do we insert? (NOTE: this appears slow, algorithm-wise,
+ # but it was compared against calendar queues, heaps, naive push/sort,
+ # and a bunch of other versions, and found to be fastest with a large
+ # variety of datasets.)
+ for (my $i = 0; $i < @Timers; $i++) {
+ if ($Timers[$i][0] > $fire_time) {
+ splice(@Timers, $i, 0, $timer);
+ return $timer;
+ }
+ }
+ die "Shouldn't get here.";
+}
- # Now, where do we insert? (NOTE: this appears slow, algorithm-wise,
- # but it was compared against calendar queues, heaps, naive push/sort,
- # and a bunch of other versions, and found to be fastest with a large
- # variety of datasets.)
- for (my $i = 0; $i < @Timers; $i++) {
- if ($Timers[$i][0] > $fire_time) {
- splice(@Timers, $i, 0, $timer);
- return $timer;
- }
- }
+sub add_timer { _add_named_timer(undef, @_) }
- die "Shouldn't get here.";
+sub add_uniq_timer { # ($name, $secs, $coderef, @args) = @_;
+ $UniqTimer{$_[0]} //= _add_named_timer(@_);
}
# keeping this around in case we support other FD types for now,
# runs timers and returns milliseconds for next one, or next event loop
sub RunTimers {
- next_tick();
+ next_tick();
- return (($nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers;
+ return (($nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers;
- my $now = now();
+ my $now = now();
- # Run expired timers
- while (@Timers && $Timers[0][0] <= $now) {
- my $to_run = shift(@Timers);
- $to_run->[1]->(@$to_run[2..$#$to_run]);
- }
+ # Run expired timers
+ while (@Timers && $Timers[0][0] <= $now) {
+ my $to_run = shift(@Timers);
+ delete $UniqTimer{$to_run->[1] // ''};
+ $to_run->[2]->(@$to_run[3..$#$to_run]);
+ }
- # timers may enqueue into nextq:
- return 0 if ($nextq || $ToClose);
+ # timers may enqueue into nextq:
+ return 0 if ($nextq || $ToClose);
- return $LoopTimeout unless @Timers;
+ return $LoopTimeout unless @Timers;
- # convert time to an even number of milliseconds, adding 1
- # extra, otherwise floating point fun can occur and we'll
- # call RunTimers like 20-30 times, each returning a timeout
- # of 0.0000212 seconds
- my $timeout = int(($Timers[0][0] - $now) * 1000) + 1;
+ # convert time to an even number of milliseconds, adding 1
+ # extra, otherwise floating point fun can occur and we'll
+ # call RunTimers like 20-30 times, each returning a timeout
+ # of 0.0000212 seconds
+ my $timeout = int(($Timers[0][0] - $now) * 1000) + 1;
- # -1 is an infinite timeout, so prefer a real timeout
- ($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout;
+ # -1 is an infinite timeout, so prefer a real timeout
+ ($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout
}
sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
# n.b.: use ->write/->read for this buffer to allow compatibility with
# PerlIO::mmap or PerlIO::scalar if needed
sub tmpio ($$$) {
- my ($self, $bref, $off) = @_;
- my $fh = tmpfile('wbuf', $self->{sock}, O_APPEND) or
- return drop($self, "tmpfile $!");
- $fh->autoflush(1);
- my $len = bytes::length($$bref) - $off;
- $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
- [ $fh, 0 ] # [1] = offset, [2] = length, not set by us
+ my ($self, $bref, $off) = @_;
+ my $fh = tmpfile('wbuf', $self->{sock}, O_APPEND) or
+ return drop($self, "tmpfile $!");
+ $fh->autoflush(1);
+ my $len = length($$bref) - $off;
+ print $fh substr($$bref, $off, $len) or
+ return drop($self, "write ($len): $!");
+ [ $fh, 0 ] # [1] = offset, [2] = length, not set by us
}
=head2 C<< $obj->write( $data ) >>
$bref->($self);
return 1;
} else {
- my $to_write = bytes::length($$bref);
+ my $to_write = length($$bref);
my $written = syswrite($sock, $$bref, $to_write);
if (defined $written) {
!$sock->can('stop_SSL')) {
my $n = send($sock, $_[1], MSG_MORE);
if (defined $n) {
- my $nlen = bytes::length($_[1]) - $n;
+ my $nlen = length($_[1]) - $n;
return 1 if $nlen == 0; # all done!
# queue up the unwritten substring:
my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
sub _run_later () {
my $q = $later_q or return;
- $later_timer = $later_q = undef;
+ $later_q = undef;
$Stack{later_q} = $q;
$_->() for @$q;
delete $Stack{later_q};
sub later ($) {
push @$later_q, $_[0]; # autovivifies @$later_q
- $later_timer //= add_timer(60, \&_run_later);
+ add_uniq_timer('later', 60, \&_run_later);
}
sub expire_old () {
- my $now = now();
- my $exp = $EXPTIME;
- my $old = $now - $exp;
- my %new;
- while (my ($fd, $idle_at) = each %$EXPMAP) {
+ my $cur = $EXPMAP or return;
+ $EXPMAP = undef;
+ my $old = now() - $EXPTIME;
+ while (my ($fd, $idle_at) = each %$cur) {
if ($idle_at < $old) {
my $ds_obj = $DescriptorMap{$fd};
- $new{$fd} = $idle_at if !$ds_obj->shutdn;
+ $EXPMAP->{$fd} = $idle_at if !$ds_obj->shutdn;
} else {
- $new{$fd} = $idle_at;
+ $EXPMAP->{$fd} = $idle_at;
}
}
- $EXPMAP = \%new;
- $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef;
+ add_uniq_timer('expire', 60, \&expire_old) if $EXPMAP;
}
sub update_idle_time {
my ($self) = @_;
my $sock = $self->{sock} or return;
$EXPMAP->{fileno($sock)} = now();
- $exp_timer //= later(\&expire_old);
+ add_uniq_timer('expire', 60, \&expire_old);
}
sub not_idle_long {