my %Stack;
my $nextq; # queue for next_tick
my $wait_pids; # list of [ pid, callback, callback_arg ]
-my $EXPMAP; # fd -> idle_time
-our $EXPTIME = 180; # 3 minutes
-my ($reap_armed);
+my $reap_armed;
my $ToClose; # sockets to close when event loop is done
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
# we may be iterating inside one of these on our stack
my @q = delete @Stack{keys %Stack};
for my $q (@q) { @$q = () }
- $EXPMAP = undef;
$wait_pids = $nextq = $ToClose = undef;
$ep_io = undef; # closes real $Epoll FD
$Epoll = undef; # may call DSKQXS::DESTROY
$ToClose = undef; # will be autovivified on push
@$close_now = map { fileno($_) } @$close_now;
- # order matters, destroy expiry times, first:
- delete @$EXPMAP{@$close_now};
-
# ->DESTROY methods may populate ToClose
delete @DescriptorMap{@$close_now};
}
}
}
-sub expire_old () {
- my $cur = $EXPMAP or return;
- $EXPMAP = undef;
- my $old = now() - $EXPTIME;
- while (my ($fd, $idle_at) = each %$cur) {
- if ($idle_at < $old) {
- my $ds_obj = $DescriptorMap{$fd};
- $EXPMAP->{$fd} = $idle_at if !$ds_obj->shutdn;
- } else {
- $EXPMAP->{$fd} = $idle_at;
- }
- }
- add_uniq_timer('expire', 60, \&expire_old) if $EXPMAP;
-}
-
-sub update_idle_time {
- my ($self) = @_;
- my $sock = $self->{sock} or return;
- $EXPMAP->{fileno($sock)} = now();
- add_uniq_timer('expire', 60, \&expire_old);
-}
-
-sub not_idle_long {
- my ($self, $now) = @_;
- my $sock = $self->{sock} or return;
- my $idle_at = $EXPMAP->{fileno($sock)} or return;
- ($idle_at + $EXPTIME) > $now;
-}
-
1;
=head1 AUTHORS (Danga::Socket)
my ($dmap, undef) = @_;
my $n = 0;
my $now = now();
-
- foreach my $s (values %$dmap) {
+ for my $s (values %$dmap) {
$s->can('busy') or next;
- if ($s->busy($now)) {
+ if ($s->busy) {
++$n;
- } else {
- # close as much as possible, early as possible
+ } else { # close as much as possible, early as possible
$s->close;
}
}
$self->SUPER::close; # PublicInbox::DS::close
}
-# for graceful shutdown in PublicInbox::Daemon:
-sub busy () {
+sub busy { # for graceful shutdown in PublicInbox::Daemon:
my ($self) = @_;
- ($self->{rbuf} || exists($self->{env}) || $self->{wbuf});
+ defined($self->{rbuf}) || exists($self->{env}) || defined($self->{wbuf})
}
# runs $cb on the next iteration of the event loop at earliest
my $valid_range = '[0-9]+|[0-9]+:[0-9]+|[0-9]+:\*';
$valid_range = qr/\A(?:$valid_range)(?:,(?:$valid_range))*\z/;
-# RFC 3501 5.4. Autologout Timer needs to be >= 30min
-$PublicInbox::DS::EXPTIME = 60 * 30;
-
sub greet ($) {
my ($self) = @_;
my $capa = capa($self);
} else {
greet($self);
}
- $self->update_idle_time;
$self;
}
$IDLERS = undef;
for my $i (values %$old) {
next if ($i->{wbuf} || !exists($i->{-idle_tag}));
- $i->update_idle_time or next;
$IDLERS->{fileno($i->{sock})} = $i;
$i->write(\"* OK Still here\r\n");
}
out($self, " deferred[$fd] aborted - %0.6f", $elapsed);
$self->close;
} elsif ($more) { # $self->{wbuf}:
- $self->update_idle_time;
-
# control passed to ibx_async_cat if $more == \undef
requeue_once($self) if !ref($more);
} else { # all done!
return unless $self->flush_write && $self->{sock} && !$self->{long_cb};
- $self->update_idle_time;
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
return $self->close if $r < 0;
$self->rbuf_idle($rbuf);
- $self->update_idle_time;
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
undef;
}
-# for graceful shutdown in PublicInbox::Daemon:
-sub busy {
- my ($self, $now) = @_;
+sub busy { # for graceful shutdown in PublicInbox::Daemon:
+ my ($self) = @_;
if (defined($self->{-idle_tag})) {
$self->write(\"* BYE server shutting down\r\n");
return; # not busy anymore
}
- ($self->{rbuf} || $self->{wbuf} || $self->not_idle_long($now));
+ defined($self->{rbuf}) || defined($self->{wbuf}) ||
+ !$self->write(\"* BYE server shutting down\r\n");
}
sub close {
} else {
greet($self);
}
- $self->update_idle_time;
$self;
}
out($self, " deferred[$fd] aborted - %0.6f", $elapsed);
$self->close;
} elsif ($more) { # $self->{wbuf}:
- $self->update_idle_time;
-
# COMPRESS users all share the same DEFLATE context.
# Flush it here to ensure clients don't see
# each other's data
return unless $self->flush_write && $self->{sock} && !$self->{long_cb};
- $self->update_idle_time;
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
return $self->close if $r < 0;
$self->rbuf_idle($rbuf);
- $self->update_idle_time;
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
$self->requeue unless $pending;
}
-# for graceful shutdown in PublicInbox::Daemon:
-sub busy {
- my ($self, $now) = @_;
- ($self->{rbuf} || $self->{wbuf} || $self->not_idle_long($now));
+sub busy { # for graceful shutdown in PublicInbox::Daemon:
+ my ($self) = @_;
+ defined($self->{rbuf}) || defined($self->{wbuf})
}
1;