X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=0a763d0ef643ad0d5c5deb8e027b9df67d4438ae;hb=HEAD;hp=ef483aacf2724a4bf73a50e64223c1b40374f3f2;hpb=23af251dd607c4e75ab1e68063f2c885c48cc035;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index ef483aac..0a763d0e 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -30,13 +30,13 @@ use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use Scalar::Util qw(blessed); use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; -use Errno qw(EAGAIN EINVAL); +use Errno qw(EAGAIN EINVAL ECHILD EINTR); use Carp qw(carp croak); -our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); +our @EXPORT_OK = qw(now msg_more awaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick -my $wait_pids; # list of [ pid, callback, callback_arg ] +my $AWAIT_PIDS; # pid => [ $callback, @args ] my $reap_armed; my $ToClose; # sockets to close when event loop is done our ( @@ -74,10 +74,10 @@ sub Reset { # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; for my $q (@q) { @$q = () } - $wait_pids = $nextq = $ToClose = undef; + $AWAIT_PIDS = $nextq = $ToClose = undef; $ep_io = undef; # closes real $Epoll FD $Epoll = undef; # may call DSKQXS::DESTROY - } while (@Timers || keys(%Stack) || $nextq || $wait_pids || + } while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS || $ToClose || keys(%DescriptorMap) || $PostLoopCallback || keys(%UniqTimer)); @@ -126,12 +126,11 @@ sub add_uniq_timer { # ($name, $secs, $coderef, @args) = @_; # caller sets return value to $Epoll sub _InitPoller () { - if (PublicInbox::Syscall::epoll_defined()) { + if (defined $PublicInbox::Syscall::SYS_epoll_create) { my $fd = epoll_create(); die "epoll_create: $!" if $fd < 0; open($ep_io, '+<&=', $fd) or return; - my $fl = fcntl($ep_io, F_GETFD, 0); - fcntl($ep_io, F_SETFD, $fl | FD_CLOEXEC); + fcntl($ep_io, F_SETFD, FD_CLOEXEC); $fd; } else { my $cls; @@ -202,33 +201,30 @@ sub block_signals () { $oldset; } -# We can't use waitpid(-1) safely here since it can hit ``, system(), -# and other things. So we scan the $wait_pids list, which is hopefully -# not too big. We keep $wait_pids small by not calling dwaitpid() -# until we've hit EOF when reading the stdout of the child. +sub await_cb ($;@) { + my ($pid, @cb_args) = @_; + my $cb = shift @cb_args or return; + eval { $cb->($pid, @cb_args) }; + warn "E: awaitpid($pid): $@" if $@; +} +# This relies on our Perl process is single-threaded, or at least +# no threads are spawning and waiting on processes (``, system(), etc...) +# Threads are officially discouraged by the Perl5 team, and I expect +# that to remain the case. sub reap_pids { $reap_armed = undef; - my $tmp = $wait_pids or return; - $wait_pids = undef; - $Stack{reap_runq} = $tmp; my $oldset = block_signals(); - foreach my $ary (@$tmp) { - my ($pid, $cb, $arg) = @$ary; - my $ret = waitpid($pid, WNOHANG); - if ($ret == 0) { - push @$wait_pids, $ary; # autovivifies @$wait_pids - } elsif ($ret == $pid) { - if ($cb) { - eval { $cb->($arg, $pid) }; - warn "E: dwaitpid($pid) in_loop: $@" if $@; - } + while (1) { + my $pid = waitpid(-1, WNOHANG) // last; + last if $pid <= 0; + if (defined(my $cb_args = delete $AWAIT_PIDS->{$pid})) { + await_cb($pid, @$cb_args) if $cb_args; } else { - warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?"; + warn "W: reaped unknown PID=$pid: \$?=$?\n"; } } sig_setmask($oldset); - delete $Stack{reap_runq}; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) @@ -648,7 +644,7 @@ sub shutdn ($) { } } -sub zflush {} # overridden by DSdeflate +sub dflush {} # overridden by DSdeflate sub compressed {} # overridden by DSdeflate sub long_response_done {} # overridden by Net::NNTP @@ -660,8 +656,8 @@ sub long_step { if ($@ || !$self->{sock}) { # something bad happened... delete $self->{long_cb}; my $elapsed = now() - $t0; - $@ and $self->err("%s during long response[$fd] - %0.6f", - $@, $elapsed); + $@ and warn("$@ during long response[$fd] - ", + sprintf('%0.6f', $elapsed),"\n"); $self->out(" deferred[$fd] aborted - %0.6f", $elapsed); $self->close; } elsif ($more) { # $self->{wbuf}: @@ -682,13 +678,13 @@ sub requeue_once { my ($self) = @_; # COMPRESS users all share the same DEFLATE context. # Flush it here to ensure clients don't see each other's data - $self->zflush; + $self->dflush; # no recursion, schedule another call ASAP, # but only after all pending writes are done. # autovivify wbuf. wbuf may be populated by $cb, # no need to rearm if so: (push returns new size of array) - requeue($self) if push(@{$self->{wbuf}}, \&long_step) == 1; + $self->requeue if push(@{$self->{wbuf}}, \&long_step) == 1; } sub long_response ($$;@) { @@ -702,22 +698,26 @@ sub long_response ($$;@) { undef; } -sub dwaitpid ($;$$) { - my ($pid, $cb, $arg) = @_; - if ($in_loop) { - push @$wait_pids, [ $pid, $cb, $arg ]; - # We could've just missed our SIGCHLD, cover it, here: - enqueue_reap(); - } else { - my $ret = waitpid($pid, 0); +sub awaitpid { + my ($pid, @cb_args) = @_; # @cb_args = ($cb, @args), $cb may be undef + $AWAIT_PIDS->{$pid} = \@cb_args if @cb_args; + # provide synchronous API + if (defined(wantarray) || (!$in_loop && !@cb_args)) { + my $ret; +again: + $ret = waitpid($pid, 0) // -2; if ($ret == $pid) { - if ($cb) { - eval { $cb->($arg, $pid) }; - carp "E: dwaitpid($pid) !in_loop: $@" if $@; - } + my $cb_args = delete $AWAIT_PIDS->{$pid}; + @cb_args = @$cb_args if !@cb_args && $cb_args; + await_cb($pid, @cb_args); } else { - carp "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?"; + goto again if $! == EINTR; + carp "waitpid($pid): $!"; + delete $AWAIT_PIDS->{$pid}; } + return $ret; + } elsif ($in_loop) { # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap(); } }