X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=d0caa5e73102f13ea27bf455e242b19cbb72220e;hb=4ccff6f9122da89c18ae3f38a130a06de5434826;hp=da68802dda94a82da154e156207442b4641bdd99;hpb=94096cab6cd5e00c8a36a4a2667bdb9acf43d01f;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index da68802d..d0caa5e7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -40,7 +40,7 @@ my $wait_pids; # list of [ pid, callback, callback_arg ] my $later_queue; # list of callbacks to run at some later interval my $EXPMAP; # fd -> idle_time our $EXPTIME = 180; # 3 minutes -my ($later_timer, $reap_timer, $exp_timer); +my ($later_timer, $reap_armed, $exp_timer); my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object @@ -68,9 +68,9 @@ Reset all state =cut sub Reset { %DescriptorMap = (); - $wait_pids = $later_queue = undef; + $in_loop = $wait_pids = $later_queue = $reap_armed = undef; $EXPMAP = {}; - $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef; + $nextq = $ToClose = $later_timer = $exp_timer = undef; $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -95,18 +95,18 @@ sub SetLoopTimeout { return $LoopTimeout = $_[1] + 0; } -=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >> +=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >> Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. =cut -sub add_timer ($$) { - my ($secs, $coderef) = @_; +sub add_timer ($$;$) { + my ($secs, $coderef, $arg) = @_; my $fire_time = now() + $secs; - my $timer = [$fire_time, $coderef]; + my $timer = [$fire_time, $coderef, $arg]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -198,7 +198,7 @@ sub RunTimers { # Run expired timers while (@Timers && $Timers[0][0] <= $now) { my $to_run = shift(@Timers); - $to_run->[1]->($now) if $to_run->[1]; + $to_run->[1]->($to_run->[2]); } # timers may enqueue into nextq: @@ -225,24 +225,26 @@ sub RunTimers { # and other things. So we scan the $wait_pids list, which is hopefully # not too big. We keep $wait_pids small by not calling dwaitpid() # until we've hit EOF when reading the stdout of the child. + sub reap_pids { - my $tmp = $wait_pids or return; - $wait_pids = $reap_timer = undef; - foreach my $ary (@$tmp) { - my ($pid, $cb, $arg) = @$ary; - my $ret = waitpid($pid, WNOHANG); - if ($ret == 0) { - push @$wait_pids, $ary; # autovivifies @$wait_pids - } elsif ($cb) { - eval { $cb->($arg, $pid) }; - } - } - # we may not be done, yet, and could've missed/masked a SIGCHLD: - $reap_timer = add_timer(1, \&reap_pids) if $wait_pids; + $reap_armed = undef; + my $tmp = $wait_pids or return; + $wait_pids = undef; + foreach my $ary (@$tmp) { + my ($pid, $cb, $arg) = @$ary; + my $ret = waitpid($pid, WNOHANG); + if ($ret == 0) { + push @$wait_pids, $ary; # autovivifies @$wait_pids + } elsif ($cb) { + eval { $cb->($arg, $pid) }; + } + } + # we may not be done, yet, and could've missed/masked a SIGCHLD: + $reap_armed //= requeue(\&reap_pids) if $wait_pids; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) -sub enqueue_reap ($) { push @$nextq, \&reap_pids }; # autovivifies +sub enqueue_reap () { $reap_armed //= requeue(\&reap_pids) } sub in_loop () { $in_loop } @@ -330,6 +332,7 @@ sub new { _InitPoller(); +retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { $ev &= ~EPOLLEXCLUSIVE; @@ -627,7 +630,7 @@ sub dwaitpid ($$$) { push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ] # We could've just missed our SIGCHLD, cover it, here: - requeue(\&reap_pids); + enqueue_reap(); } sub _run_later () {