X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=d0caa5e73102f13ea27bf455e242b19cbb72220e;hb=4ccff6f9122da89c18ae3f38a130a06de5434826;hp=b7753e1a663148320e798aa606bfe0af1e9d4f7d;hpb=8fd41797b24736dfdccfacc5acc473234a29758a;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index b7753e1a..d0caa5e7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -13,6 +13,12 @@ # Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org, # fixed in Danga::Socket 1.62 and visible at: # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket +# +# fields: +# sock: underlying socket +# rbuf: scalarref, usually undef +# wbuf: arrayref of coderefs or tmpio (autovivified)) +# (tmpio = [ GLOB, offset, [ length ] ]) package PublicInbox::DS; use strict; use bytes; @@ -22,19 +28,10 @@ use Fcntl qw(SEEK_SET :DEFAULT O_APPEND); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); our @EXPORT_OK = qw(now msg_more); -use warnings; use 5.010_001; use Scalar::Util qw(blessed); - use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; - -use fields ('sock', # underlying socket - 'rbuf', # scalarref, usually undef - 'wbuf', # arrayref of coderefs or tmpio (autovivified)) - # (tmpio = [ GLOB, offset, [ length ] ]) - ); - use Errno qw(EAGAIN EINVAL); use Carp qw(confess carp); @@ -43,7 +40,7 @@ my $wait_pids; # list of [ pid, callback, callback_arg ] my $later_queue; # list of callbacks to run at some later interval my $EXPMAP; # fd -> idle_time our $EXPTIME = 180; # 3 minutes -my ($later_timer, $reap_timer, $exp_timer); +my ($later_timer, $reap_armed, $exp_timer); my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object @@ -71,9 +68,9 @@ Reset all state =cut sub Reset { %DescriptorMap = (); - $wait_pids = $later_queue = undef; + $in_loop = $wait_pids = $later_queue = $reap_armed = undef; $EXPMAP = {}; - $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef; + $nextq = $ToClose = $later_timer = $exp_timer = undef; $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -98,18 +95,18 @@ sub SetLoopTimeout { return $LoopTimeout = $_[1] + 0; } -=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >> +=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >> Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. =cut -sub add_timer ($$) { - my ($secs, $coderef) = @_; +sub add_timer ($$;$) { + my ($secs, $coderef, $arg) = @_; my $fire_time = now() + $secs; - my $timer = [$fire_time, $coderef]; + my $timer = [$fire_time, $coderef, $arg]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -201,7 +198,7 @@ sub RunTimers { # Run expired timers while (@Timers && $Timers[0][0] <= $now) { my $to_run = shift(@Timers); - $to_run->[1]->($now) if $to_run->[1]; + $to_run->[1]->($to_run->[2]); } # timers may enqueue into nextq: @@ -228,24 +225,26 @@ sub RunTimers { # and other things. So we scan the $wait_pids list, which is hopefully # not too big. We keep $wait_pids small by not calling dwaitpid() # until we've hit EOF when reading the stdout of the child. + sub reap_pids { - my $tmp = $wait_pids or return; - $wait_pids = $reap_timer = undef; - foreach my $ary (@$tmp) { - my ($pid, $cb, $arg) = @$ary; - my $ret = waitpid($pid, WNOHANG); - if ($ret == 0) { - push @$wait_pids, $ary; # autovivifies @$wait_pids - } elsif ($cb) { - eval { $cb->($arg, $pid) }; - } - } - # we may not be done, yet, and could've missed/masked a SIGCHLD: - $reap_timer = add_timer(1, \&reap_pids) if $wait_pids; + $reap_armed = undef; + my $tmp = $wait_pids or return; + $wait_pids = undef; + foreach my $ary (@$tmp) { + my ($pid, $cb, $arg) = @$ary; + my $ret = waitpid($pid, WNOHANG); + if ($ret == 0) { + push @$wait_pids, $ary; # autovivifies @$wait_pids + } elsif ($cb) { + eval { $cb->($arg, $pid) }; + } + } + # we may not be done, yet, and could've missed/masked a SIGCHLD: + $reap_armed //= requeue(\&reap_pids) if $wait_pids; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) -sub enqueue_reap ($) { push @$nextq, \&reap_pids }; # autovivifies +sub enqueue_reap () { $reap_armed //= requeue(\&reap_pids) } sub in_loop () { $in_loop } @@ -328,13 +327,12 @@ This is normally (always?) called from your subclass via: =cut sub new { my ($self, $sock, $ev) = @_; - $self = fields::new($self) unless ref $self; - $self->{sock} = $sock; my $fd = fileno($sock); _InitPoller(); +retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { $ev &= ~EPOLLEXCLUSIVE; @@ -421,8 +419,8 @@ sub epbit ($$) { # (sock, default) # returns 1 if done, 0 if incomplete sub flush_write ($) { my ($self) = @_; + my $sock = $self->{sock} or return; my $wbuf = $self->{wbuf} or return 1; - my $sock = $self->{sock}; next_buf: while (my $bref = $wbuf->[0]) { @@ -632,7 +630,7 @@ sub dwaitpid ($$$) { push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ] # We could've just missed our SIGCHLD, cover it, here: - requeue(\&reap_pids); + enqueue_reap(); } sub _run_later () {