'wbuf_off', # offset into first element of wbuf to start writing at
);
-use Errno qw(EAGAIN EINVAL);
-use Carp qw(croak confess carp);
-require File::Spec;
-
-my $nextq = []; # queue for next_tick
-my $WaitPids = []; # list of [ pid, callback, callback_arg ]
-my $reap_timer;
+use Errno qw(EAGAIN EINVAL);
+use Carp qw(confess carp);
+
+my $nextq; # queue for next_tick
+my $WaitPids; # list of [ pid, callback, callback_arg ]
+my $later_queue; # callbacks
+my $EXPMAP; # fd -> [ idle_time, $self ]
+our $EXPTIME = 180; # 3 minutes
+my ($later_timer, $reap_timer, $exp_timer);
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
$Epoll, # Global epoll fd (or DSKQXS ref)
=cut
sub Reset {
%DescriptorMap = ();
+ $nextq = [];
$WaitPids = [];
- $reap_timer = undef;
+ $later_queue = [];
+ $EXPMAP = {};
+ $reap_timer = $later_timer = $exp_timer = undef;
@ToClose = ();
$LoopTimeout = -1; # no timeout by default
@Timers = ();
sub EpollEventLoop {
local $in_loop = 1;
- while (1) {
+ do {
my @events;
my $i;
my $timeout = RunTimers();
# in that event.
$DescriptorMap{$events[$i]->[0]}->event_step;
}
- return unless PostEventLoop();
- }
+ } while (PostEventLoop());
+ _run_later();
}
=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
sub msg_more ($$) {
my $self = $_[0];
my $sock = $self->{sock} or return 1;
+ my $wbuf = $self->{wbuf};
- if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') {
+ if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
+ ref($sock) ne 'IO::Socket::SSL') {
my $n = send($sock, $_[1], MSG_MORE);
if (defined $n) {
my $nlen = bytes::length($_[1]) - $n;
return 1 if $nlen == 0; # all done!
# queue up the unwritten substring:
my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
- $self->{wbuf} = [ $tmpio ];
+ $self->{wbuf} //= $wbuf //= [];
+ push @$wbuf, $tmpio;
epwait($sock, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
}
+sub _run_later () {
+ my $run = $later_queue;
+ $later_timer = undef;
+ $later_queue = [];
+ $_->() for @$run;
+}
+
+sub later ($) {
+ my ($cb) = @_;
+ push @$later_queue, $cb;
+ $later_timer //= AddTimer(undef, 60, \&_run_later);
+}
+
+sub expire_old () {
+ my $now = now();
+ my $exp = $EXPTIME;
+ my $old = $now - $exp;
+ my %new;
+ while (my ($fd, $v) = each %$EXPMAP) {
+ my ($idle_time, $ds_obj) = @$v;
+ if ($idle_time < $old) {
+ if (!$ds_obj->shutdn) {
+ $new{$fd} = $v;
+ }
+ } else {
+ $new{$fd} = $v;
+ }
+ }
+ $EXPMAP = \%new;
+ $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef;
+}
+
+sub update_idle_time {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ $EXPMAP->{fileno($sock)} = [ now(), $self ];
+ $exp_timer //= later(\&expire_old);
+}
+
+sub not_idle_long {
+ my ($self, $now) = @_;
+ my $sock = $self->{sock} or return;
+ my $ary = $EXPMAP->{fileno($sock)} or return;
+ my $exp_at = $ary->[0] + $EXPTIME;
+ $exp_at > $now;
+}
+
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {