X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=bf8c4466218a37a43688aaaca2b063c65e98c86e;hb=a1a8cbab22adec879f97dccd9acfd0c5b2492ba9;hp=c89c7b8b40f4ddb4f9f89825e0c8a0c9f2e7edd6;hpb=e693f102e230796a13a648750b65b8ca38f04a16;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index c89c7b8b..bf8c4466 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -37,15 +37,12 @@ our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick my $wait_pids; # list of [ pid, callback, callback_arg ] -my $later_q; # list of callbacks to run at some later interval -my $EXPMAP; # fd -> idle_time -our $EXPTIME = 180; # 3 minutes -my ($reap_armed, $exp_timer); +my $reap_armed; my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) - $_io, # IO::Handle for Epoll + $ep_io, # IO::Handle for Epoll $PostLoopCallback, # subref to call at the end of each loop, if defined (global) @@ -77,15 +74,14 @@ sub Reset { # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; for my $q (@q) { @$q = () } - $EXPMAP = {}; - $wait_pids = $later_q = $nextq = $ToClose = undef; - $_io = undef; # closes real $Epoll FD + $wait_pids = $nextq = $ToClose = undef; + $ep_io = undef; # closes real $Epoll FD $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $wait_pids || - $later_q || $ToClose || keys(%DescriptorMap) || + $ToClose || keys(%DescriptorMap) || $PostLoopCallback || keys(%UniqTimer)); - $reap_armed = $exp_timer = undef; + $reap_armed = undef; $LoopTimeout = -1; # no timeout by default } @@ -128,41 +124,26 @@ sub add_uniq_timer { # ($name, $secs, $coderef, @args) = @_; $UniqTimer{$_[0]} //= _add_named_timer(@_); } -# keeping this around in case we support other FD types for now, -# epoll_create1(EPOLL_CLOEXEC) requires Linux 2.6.27+... -sub set_cloexec ($) { - my ($fd) = @_; - - open($_io, '+<&=', $fd) or return; - defined(my $fl = fcntl($_io, F_GETFD, 0)) or return; - fcntl($_io, F_SETFD, $fl | FD_CLOEXEC); -} - # caller sets return value to $Epoll -sub _InitPoller -{ - if (PublicInbox::Syscall::epoll_defined()) { - my $fd = epoll_create(); - set_cloexec($fd) if (defined($fd) && $fd >= 0); - $fd; - } else { - my $cls; - for (qw(DSKQXS DSPoll)) { - $cls = "PublicInbox::$_"; - last if eval "require $cls"; - } - $cls->import(qw(epoll_ctl epoll_wait)); - $cls->new; - } +sub _InitPoller () { + if (PublicInbox::Syscall::epoll_defined()) { + my $fd = epoll_create(); + die "epoll_create: $!" if $fd < 0; + open($ep_io, '+<&=', $fd) or return; + my $fl = fcntl($ep_io, F_GETFD, 0); + fcntl($ep_io, F_SETFD, $fl | FD_CLOEXEC); + $fd; + } else { + my $cls; + for (qw(DSKQXS DSPoll)) { + $cls = "PublicInbox::$_"; + last if eval "require $cls"; + } + $cls->import(qw(epoll_ctl epoll_wait)); + $cls->new; + } } -=head2 C<< CLASS->EventLoop() >> - -Start processing IO events. In most daemon programs this never exits. See -C below for how to exit the loop. - -=cut - sub now () { clock_gettime(CLOCK_MONOTONIC) } sub next_tick () { @@ -267,9 +248,6 @@ sub PostEventLoop () { $ToClose = undef; # will be autovivified on push @$close_now = map { fileno($_) } @$close_now; - # order matters, destroy expiry times, first: - delete @$EXPMAP{@$close_now}; - # ->DESTROY methods may populate ToClose delete @DescriptorMap{@$close_now}; } @@ -278,27 +256,41 @@ sub PostEventLoop () { $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1; } -sub EventLoop { - $Epoll //= _InitPoller(); - local $in_loop = 1; - my @events; - do { - my $timeout = RunTimers(); - - # get up to 1000 events - epoll_wait($Epoll, 1000, $timeout, \@events); - for my $fd (@events) { - # it's possible epoll_wait returned many events, including some at the end - # that ones in the front triggered unregister-interest actions. if we - # can't find the %sock entry, it's because we're no longer interested - # in that event. - - # guard stack-not-refcounted w/ Carp + @DB::args - my $obj = $DescriptorMap{$fd}; - $obj->event_step; - } - } while (PostEventLoop()); - _run_later(); +# Start processing IO events. In most daemon programs this never exits. See +# C for how to exit the loop. +sub event_loop (;$$) { + my ($sig, $oldset) = @_; + $Epoll //= _InitPoller(); + require PublicInbox::Sigfd if $sig; + my $sigfd = PublicInbox::Sigfd->new($sig, 1) if $sig; + local @SIG{keys %$sig} = values(%$sig) if $sig && !$sigfd; + local $SIG{PIPE} = 'IGNORE'; + if (!$sigfd && $sig) { + # wake up every second to accept signals if we don't + # have signalfd or IO::KQueue: + sig_setmask($oldset); + PublicInbox::DS->SetLoopTimeout(1000); + } + $_[0] = $sigfd = $sig = undef; # $_[0] == sig + local $in_loop = 1; + my @events; + do { + my $timeout = RunTimers(); + + # get up to 1000 events + epoll_wait($Epoll, 1000, $timeout, \@events); + for my $fd (@events) { + # it's possible epoll_wait returned many events, + # including some at the end that ones in the front + # triggered unregister-interest actions. if we can't + # find the %sock entry, it's because we're no longer + # interested in that event. + + # guard stack-not-refcounted w/ Carp + @DB::args + my $obj = $DescriptorMap{$fd}; + $obj->event_step; + } + } while (PostEventLoop()); } =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> @@ -328,7 +320,7 @@ sub SetPostLoopCallback { =head2 C<< CLASS->new( $socket ) >> Create a new PublicInbox::DS subclass object for the given I which will -react to events on it during the C. +react to events on it during the C. This is normally (always?) called from your subclass via: @@ -496,16 +488,15 @@ sub drop { $self->close; } -# n.b.: use ->write/->read for this buffer to allow compatibility with -# PerlIO::mmap or PerlIO::scalar if needed sub tmpio ($$$) { my ($self, $bref, $off) = @_; my $fh = tmpfile('wbuf', $self->{sock}, O_APPEND) or return drop($self, "tmpfile $!"); $fh->autoflush(1); my $len = length($$bref) - $off; - print $fh substr($$bref, $off, $len) or + my $n = syswrite($fh, $$bref, $len, $off) // return drop($self, "write ($len): $!"); + $n == $len or return drop($self, "wrote $n < $len bytes"); [ $fh, 0 ] # [1] = offset, [2] = length, not set by us } @@ -658,50 +649,6 @@ sub dwaitpid ($;$$) { } } -sub _run_later () { - my $q = $later_q or return; - $later_q = undef; - $Stack{later_q} = $q; - $_->() for @$q; - delete $Stack{later_q}; -} - -sub later ($) { - push @$later_q, $_[0]; # autovivifies @$later_q - add_uniq_timer('later', 60, \&_run_later); -} - -sub expire_old () { - my $now = now(); - my $exp = $EXPTIME; - my $old = $now - $exp; - my %new; - while (my ($fd, $idle_at) = each %$EXPMAP) { - if ($idle_at < $old) { - my $ds_obj = $DescriptorMap{$fd}; - $new{$fd} = $idle_at if !$ds_obj->shutdn; - } else { - $new{$fd} = $idle_at; - } - } - $EXPMAP = \%new; - $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef; -} - -sub update_idle_time { - my ($self) = @_; - my $sock = $self->{sock} or return; - $EXPMAP->{fileno($sock)} = now(); - $exp_timer //= later(\&expire_old); -} - -sub not_idle_long { - my ($self, $now) = @_; - my $sock = $self->{sock} or return; - my $idle_at = $EXPMAP->{fileno($sock)} or return; - ($idle_at + $EXPTIME) > $now; -} - 1; =head1 AUTHORS (Danga::Socket)