X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=0a763d0ef643ad0d5c5deb8e027b9df67d4438ae;hb=HEAD;hp=f0181b5461cfc4111574af2991d3522a57aba174;hpb=a5e39d0e4e2d9b7ad937e8dd9af726d96eec6126;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index f0181b54..0a763d0e 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -30,13 +30,13 @@ use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use Scalar::Util qw(blessed); use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; -use Errno qw(EAGAIN EINVAL); +use Errno qw(EAGAIN EINVAL ECHILD EINTR); use Carp qw(carp croak); -our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); +our @EXPORT_OK = qw(now msg_more awaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick -my $wait_pids; # list of [ pid, callback, callback_arg ] +my $AWAIT_PIDS; # pid => [ $callback, @args ] my $reap_armed; my $ToClose; # sockets to close when event loop is done our ( @@ -74,10 +74,10 @@ sub Reset { # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; for my $q (@q) { @$q = () } - $wait_pids = $nextq = $ToClose = undef; + $AWAIT_PIDS = $nextq = $ToClose = undef; $ep_io = undef; # closes real $Epoll FD $Epoll = undef; # may call DSKQXS::DESTROY - } while (@Timers || keys(%Stack) || $nextq || $wait_pids || + } while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS || $ToClose || keys(%DescriptorMap) || $PostLoopCallback || keys(%UniqTimer)); @@ -126,12 +126,11 @@ sub add_uniq_timer { # ($name, $secs, $coderef, @args) = @_; # caller sets return value to $Epoll sub _InitPoller () { - if (PublicInbox::Syscall::epoll_defined()) { + if (defined $PublicInbox::Syscall::SYS_epoll_create) { my $fd = epoll_create(); die "epoll_create: $!" if $fd < 0; open($ep_io, '+<&=', $fd) or return; - my $fl = fcntl($ep_io, F_GETFD, 0); - fcntl($ep_io, F_SETFD, $fl | FD_CLOEXEC); + fcntl($ep_io, F_SETFD, FD_CLOEXEC); $fd; } else { my $cls; @@ -202,33 +201,30 @@ sub block_signals () { $oldset; } -# We can't use waitpid(-1) safely here since it can hit ``, system(), -# and other things. So we scan the $wait_pids list, which is hopefully -# not too big. We keep $wait_pids small by not calling dwaitpid() -# until we've hit EOF when reading the stdout of the child. +sub await_cb ($;@) { + my ($pid, @cb_args) = @_; + my $cb = shift @cb_args or return; + eval { $cb->($pid, @cb_args) }; + warn "E: awaitpid($pid): $@" if $@; +} +# This relies on our Perl process is single-threaded, or at least +# no threads are spawning and waiting on processes (``, system(), etc...) +# Threads are officially discouraged by the Perl5 team, and I expect +# that to remain the case. sub reap_pids { $reap_armed = undef; - my $tmp = $wait_pids or return; - $wait_pids = undef; - $Stack{reap_runq} = $tmp; my $oldset = block_signals(); - foreach my $ary (@$tmp) { - my ($pid, $cb, $arg) = @$ary; - my $ret = waitpid($pid, WNOHANG); - if ($ret == 0) { - push @$wait_pids, $ary; # autovivifies @$wait_pids - } elsif ($ret == $pid) { - if ($cb) { - eval { $cb->($arg, $pid) }; - warn "E: dwaitpid($pid) in_loop: $@" if $@; - } + while (1) { + my $pid = waitpid(-1, WNOHANG) // last; + last if $pid <= 0; + if (defined(my $cb_args = delete $AWAIT_PIDS->{$pid})) { + await_cb($pid, @$cb_args) if $cb_args; } else { - warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?"; + warn "W: reaped unknown PID=$pid: \$?=$?\n"; } } sig_setmask($oldset); - delete $Stack{reap_runq}; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) @@ -648,38 +644,80 @@ sub shutdn ($) { } } -sub zflush {} # overridden by NNTPdeflate and IMAPdeflate +sub dflush {} # overridden by DSdeflate +sub compressed {} # overridden by DSdeflate +sub long_response_done {} # overridden by Net::NNTP + +sub long_step { + my ($self) = @_; + # wbuf is unset or empty, here; {long} may add to it + my ($fd, $cb, $t0, @args) = @{$self->{long_cb}}; + my $more = eval { $cb->($self, @args) }; + if ($@ || !$self->{sock}) { # something bad happened... + delete $self->{long_cb}; + my $elapsed = now() - $t0; + $@ and warn("$@ during long response[$fd] - ", + sprintf('%0.6f', $elapsed),"\n"); + $self->out(" deferred[$fd] aborted - %0.6f", $elapsed); + $self->close; + } elsif ($more) { # $self->{wbuf}: + # control passed to ibx_async_cat if $more == \undef + requeue_once($self) if !ref($more); + } else { # all done! + delete $self->{long_cb}; + $self->long_response_done; + my $elapsed = now() - $t0; + my $fd = fileno($self->{sock}); + $self->out(" deferred[$fd] done - %0.6f", $elapsed); + my $wbuf = $self->{wbuf}; # do NOT autovivify + requeue($self) unless $wbuf && @$wbuf; + } +} sub requeue_once { my ($self) = @_; # COMPRESS users all share the same DEFLATE context. - # Flush it here to ensure clients don't see - # each other's data - $self->zflush; + # Flush it here to ensure clients don't see each other's data + $self->dflush; # no recursion, schedule another call ASAP, # but only after all pending writes are done. # autovivify wbuf. wbuf may be populated by $cb, # no need to rearm if so: (push returns new size of array) - requeue($self) if push(@{$self->{wbuf}}, $self->can('long_step')) == 1; + $self->requeue if push(@{$self->{wbuf}}, \&long_step) == 1; } -sub dwaitpid ($;$$) { - my ($pid, $cb, $arg) = @_; - if ($in_loop) { - push @$wait_pids, [ $pid, $cb, $arg ]; - # We could've just missed our SIGCHLD, cover it, here: - enqueue_reap(); - } else { - my $ret = waitpid($pid, 0); +sub long_response ($$;@) { + my ($self, $cb, @args) = @_; # cb returns true if more, false if done + my $sock = $self->{sock} or return; + # make sure we disable reading during a long response, + # clients should not be sending us stuff and making us do more + # work while we are stream a response to them + $self->{long_cb} = [ fileno($sock), $cb, now(), @args ]; + long_step($self); # kick off! + undef; +} + +sub awaitpid { + my ($pid, @cb_args) = @_; # @cb_args = ($cb, @args), $cb may be undef + $AWAIT_PIDS->{$pid} = \@cb_args if @cb_args; + # provide synchronous API + if (defined(wantarray) || (!$in_loop && !@cb_args)) { + my $ret; +again: + $ret = waitpid($pid, 0) // -2; if ($ret == $pid) { - if ($cb) { - eval { $cb->($arg, $pid) }; - carp "E: dwaitpid($pid) !in_loop: $@" if $@; - } + my $cb_args = delete $AWAIT_PIDS->{$pid}; + @cb_args = @$cb_args if !@cb_args && $cb_args; + await_cb($pid, @cb_args); } else { - carp "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?"; + goto again if $! == EINTR; + carp "waitpid($pid): $!"; + delete $AWAIT_PIDS->{$pid}; } + return $ret; + } elsif ($in_loop) { # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap(); } }