use fields ('sock', # underlying socket
'rbuf', # scalarref, usually undef
- 'wbuf', # arrayref of coderefs or GLOB refs
+ 'wbuf', # arrayref of coderefs or GLOB refs (autovivified)
'wbuf_off', # offset into first element of wbuf to start writing at
);
my $EXPMAP; # fd -> [ idle_time, $self ]
our $EXPTIME = 180; # 3 minutes
my ($later_timer, $reap_timer, $exp_timer);
+my $ToClose; # sockets to close when event loop is done
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
$Epoll, # Global epoll fd (or DSKQXS ref)
$_io, # IO::Handle for Epoll
- @ToClose, # sockets to close when event loop is done
$PostLoopCallback, # subref to call at the end of each loop, if defined (global)
$WaitPids = [];
$later_queue = [];
$EXPMAP = {};
- $reap_timer = $later_timer = $exp_timer = undef;
- @ToClose = ();
+ $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
$LoopTimeout = -1; # no timeout by default
@Timers = ();
return $LoopTimeout = $_[1] + 0;
}
-=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
+=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >>
Add a timer to occur $seconds from now. $seconds may be fractional, but timers
are not guaranteed to fire at the exact time you ask for.
-Returns a timer object which you can call C<< $timer->cancel >> on if you need to.
-
=cut
-sub AddTimer {
- my ($class, $secs, $coderef) = @_;
+sub add_timer ($$) {
+ my ($secs, $coderef) = @_;
my $fire_time = now() + $secs;
- my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
+ my $timer = [$fire_time, $coderef];
if (!@Timers || $fire_time >= $Timers[-1][0]) {
push @Timers, $timer;
sub RunTimers {
next_tick();
- return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
+ return ((@$nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers;
my $now = now();
}
# timers may enqueue into nextq:
- return 0 if (@$nextq || @ToClose);
+ return 0 if (@$nextq || $ToClose);
return $LoopTimeout unless @Timers;
}
if (@$WaitPids) {
# we may not be donea, and we may miss our
- $reap_timer = AddTimer(undef, 1, \&reap_pids);
+ $reap_timer = add_timer(1, \&reap_pids);
}
}
# reentrant SIGCHLD handler (since reap_pids is not reentrant)
sub enqueue_reap ($) { push @$nextq, \&reap_pids };
+sub in_loop () { $in_loop }
+
+# Internal function: run the post-event callback, send read events
+# for pushed-back data, and close pending connections. returns 1
+# if event loop should continue, or 0 to shut it all down.
+sub PostEventLoop () {
+ # now we can close sockets that wanted to close during our event
+ # processing. (we didn't want to close them during the loop, as we
+ # didn't want fd numbers being reused and confused during the event
+ # loop)
+ if (my $close_now = $ToClose) {
+ $ToClose = undef; # will be autovivified on push
+ # ->DESTROY methods may populate ToClose
+ delete($DescriptorMap{fileno($_)}) for @$close_now;
+ # let refcounting drop everything in $close_now at once
+ }
+
+ # by default we keep running, unless a postloop callback cancels it
+ $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1;
+}
+
sub EpollEventLoop {
local $in_loop = 1;
do {
$PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
}
-# Internal function: run the post-event callback, send read events
-# for pushed-back data, and close pending connections. returns 1
-# if event loop should continue, or 0 to shut it all down.
-sub PostEventLoop {
- # now we can close sockets that wanted to close during our event processing.
- # (we didn't want to close them during the loop, as we didn't want fd numbers
- # being reused and confused during the event loop)
- delete($DescriptorMap{fileno($_)}) for @ToClose;
- @ToClose = (); # let refcounting drop everything all at once
-
- # by default we keep running, unless a postloop callback (either per-object
- # or global) cancels it
- my $keep_running = 1;
-
- # now we're at the very end, call callback if defined
- if (defined $PostLoopCallback) {
- $keep_running &&= $PostLoopCallback->(\%DescriptorMap);
- }
-
- return $keep_running;
-}
-
#####################################################################
### PublicInbox::DS-the-object code
#####################################################################
# defer closing the actual socket until the event loop is done
# processing this round of events. (otherwise we might reuse fds)
- push @ToClose, $sock;
+ push @$ToClose, $sock; # autovivifies $ToClose
return 0;
}
# wbuf may be an empty array if we're being called inside
# ->flush_write via CODE bref:
- push @{$self->{wbuf} ||= []}, $tmpio;
+ push @{$self->{wbuf}}, $tmpio; # autovivifies
return 0;
}
}
return 1 if $nlen == 0; # all done!
# queue up the unwritten substring:
my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
- $self->{wbuf} //= $wbuf //= [];
- push @$wbuf, $tmpio;
+ push @{$self->{wbuf}}, $tmpio; # autovivifies
epwait($sock, EPOLLOUT|EPOLLONESHOT);
return 0;
}
return 1 if $sock->accept_SSL;
return $self->close if $! != EAGAIN;
epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
- unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+ unshift(@{$self->{wbuf}}, \&accept_tls_step); # autovivifies
0;
}
return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
return $self->close if $! != EAGAIN;
epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
- unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+ unshift(@{$self->{wbuf}}, \&shutdn_tls_step); # autovivifies
0;
}
sub later ($) {
my ($cb) = @_;
push @$later_queue, $cb;
- $later_timer //= AddTimer(undef, 60, \&_run_later);
+ $later_timer //= add_timer(60, \&_run_later);
}
sub expire_old () {
$exp_at > $now;
}
-package PublicInbox::DS::Timer;
-# [$abs_float_firetime, $coderef];
-sub cancel {
- $_[0][1] = undef;
-}
-
1;
=head1 AUTHORS (Danga::Socket)