X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=0a763d0ef643ad0d5c5deb8e027b9df67d4438ae;hb=HEAD;hp=9cca02d7ffe99c5d32446c213c1817d53e0a7e80;hpb=3472c60fc72dfb5d1152c4015f54be1644443a20;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 9cca02d7..0a763d0e 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -30,16 +30,14 @@ use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use Scalar::Util qw(blessed); use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; -use Errno qw(EAGAIN EINVAL); +use Errno qw(EAGAIN EINVAL ECHILD EINTR); use Carp qw(carp croak); -our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); +our @EXPORT_OK = qw(now msg_more awaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick -my $wait_pids; # list of [ pid, callback, callback_arg ] -my $EXPMAP; # fd -> idle_time -our $EXPTIME = 180; # 3 minutes -my ($reap_armed); +my $AWAIT_PIDS; # pid => [ $callback, @args ] +my $reap_armed; my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object @@ -76,11 +74,10 @@ sub Reset { # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; for my $q (@q) { @$q = () } - $EXPMAP = undef; - $wait_pids = $nextq = $ToClose = undef; + $AWAIT_PIDS = $nextq = $ToClose = undef; $ep_io = undef; # closes real $Epoll FD $Epoll = undef; # may call DSKQXS::DESTROY - } while (@Timers || keys(%Stack) || $nextq || $wait_pids || + } while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS || $ToClose || keys(%DescriptorMap) || $PostLoopCallback || keys(%UniqTimer)); @@ -129,12 +126,11 @@ sub add_uniq_timer { # ($name, $secs, $coderef, @args) = @_; # caller sets return value to $Epoll sub _InitPoller () { - if (PublicInbox::Syscall::epoll_defined()) { + if (defined $PublicInbox::Syscall::SYS_epoll_create) { my $fd = epoll_create(); die "epoll_create: $!" if $fd < 0; open($ep_io, '+<&=', $fd) or return; - my $fl = fcntl($ep_io, F_GETFD, 0); - fcntl($ep_io, F_SETFD, $fl | FD_CLOEXEC); + fcntl($ep_io, F_SETFD, FD_CLOEXEC); $fd; } else { my $cls; @@ -205,33 +201,30 @@ sub block_signals () { $oldset; } -# We can't use waitpid(-1) safely here since it can hit ``, system(), -# and other things. So we scan the $wait_pids list, which is hopefully -# not too big. We keep $wait_pids small by not calling dwaitpid() -# until we've hit EOF when reading the stdout of the child. +sub await_cb ($;@) { + my ($pid, @cb_args) = @_; + my $cb = shift @cb_args or return; + eval { $cb->($pid, @cb_args) }; + warn "E: awaitpid($pid): $@" if $@; +} +# This relies on our Perl process is single-threaded, or at least +# no threads are spawning and waiting on processes (``, system(), etc...) +# Threads are officially discouraged by the Perl5 team, and I expect +# that to remain the case. sub reap_pids { $reap_armed = undef; - my $tmp = $wait_pids or return; - $wait_pids = undef; - $Stack{reap_runq} = $tmp; my $oldset = block_signals(); - foreach my $ary (@$tmp) { - my ($pid, $cb, $arg) = @$ary; - my $ret = waitpid($pid, WNOHANG); - if ($ret == 0) { - push @$wait_pids, $ary; # autovivifies @$wait_pids - } elsif ($ret == $pid) { - if ($cb) { - eval { $cb->($arg, $pid) }; - warn "E: dwaitpid($pid) in_loop: $@" if $@; - } + while (1) { + my $pid = waitpid(-1, WNOHANG) // last; + last if $pid <= 0; + if (defined(my $cb_args = delete $AWAIT_PIDS->{$pid})) { + await_cb($pid, @$cb_args) if $cb_args; } else { - warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?"; + warn "W: reaped unknown PID=$pid: \$?=$?\n"; } } sig_setmask($oldset); - delete $Stack{reap_runq}; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) @@ -251,9 +244,6 @@ sub PostEventLoop () { $ToClose = undef; # will be autovivified on push @$close_now = map { fileno($_) } @$close_now; - # order matters, destroy expiry times, first: - delete @$EXPMAP{@$close_now}; - # ->DESTROY methods may populate ToClose delete @DescriptorMap{@$close_now}; } @@ -353,6 +343,24 @@ retry: $DescriptorMap{$fd} = $self; } +# for IMAP, NNTP, and POP3 which greet clients upon connect +sub greet { + my ($self, $sock) = @_; + my $ev = EPOLLIN; + my $wbuf; + if ($sock->can('accept_SSL') && !$sock->accept_SSL) { + return CORE::close($sock) if $! != EAGAIN; + $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock); + $wbuf = [ \&accept_tls_step, $self->can('do_greet')]; + } + new($self, $sock, $ev | EPOLLONESHOT); + if ($wbuf) { + $self->{wbuf} = $wbuf; + } else { + $self->do_greet; + } + $self; +} ##################################################################### ### I N S T A N C E M E T H O D S @@ -636,52 +644,81 @@ sub shutdn ($) { } } -sub dwaitpid ($;$$) { - my ($pid, $cb, $arg) = @_; - if ($in_loop) { - push @$wait_pids, [ $pid, $cb, $arg ]; - # We could've just missed our SIGCHLD, cover it, here: - enqueue_reap(); - } else { - my $ret = waitpid($pid, 0); - if ($ret == $pid) { - if ($cb) { - eval { $cb->($arg, $pid) }; - carp "E: dwaitpid($pid) !in_loop: $@" if $@; - } - } else { - carp "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?"; - } - } -} +sub dflush {} # overridden by DSdeflate +sub compressed {} # overridden by DSdeflate +sub long_response_done {} # overridden by Net::NNTP -sub expire_old () { - my $cur = $EXPMAP or return; - $EXPMAP = undef; - my $old = now() - $EXPTIME; - while (my ($fd, $idle_at) = each %$cur) { - if ($idle_at < $old) { - my $ds_obj = $DescriptorMap{$fd}; - $EXPMAP->{$fd} = $idle_at if !$ds_obj->shutdn; - } else { - $EXPMAP->{$fd} = $idle_at; - } +sub long_step { + my ($self) = @_; + # wbuf is unset or empty, here; {long} may add to it + my ($fd, $cb, $t0, @args) = @{$self->{long_cb}}; + my $more = eval { $cb->($self, @args) }; + if ($@ || !$self->{sock}) { # something bad happened... + delete $self->{long_cb}; + my $elapsed = now() - $t0; + $@ and warn("$@ during long response[$fd] - ", + sprintf('%0.6f', $elapsed),"\n"); + $self->out(" deferred[$fd] aborted - %0.6f", $elapsed); + $self->close; + } elsif ($more) { # $self->{wbuf}: + # control passed to ibx_async_cat if $more == \undef + requeue_once($self) if !ref($more); + } else { # all done! + delete $self->{long_cb}; + $self->long_response_done; + my $elapsed = now() - $t0; + my $fd = fileno($self->{sock}); + $self->out(" deferred[$fd] done - %0.6f", $elapsed); + my $wbuf = $self->{wbuf}; # do NOT autovivify + requeue($self) unless $wbuf && @$wbuf; } - add_uniq_timer('expire', 60, \&expire_old) if $EXPMAP; } -sub update_idle_time { +sub requeue_once { my ($self) = @_; - my $sock = $self->{sock} or return; - $EXPMAP->{fileno($sock)} = now(); - add_uniq_timer('expire', 60, \&expire_old); + # COMPRESS users all share the same DEFLATE context. + # Flush it here to ensure clients don't see each other's data + $self->dflush; + + # no recursion, schedule another call ASAP, + # but only after all pending writes are done. + # autovivify wbuf. wbuf may be populated by $cb, + # no need to rearm if so: (push returns new size of array) + $self->requeue if push(@{$self->{wbuf}}, \&long_step) == 1; } -sub not_idle_long { - my ($self, $now) = @_; +sub long_response ($$;@) { + my ($self, $cb, @args) = @_; # cb returns true if more, false if done my $sock = $self->{sock} or return; - my $idle_at = $EXPMAP->{fileno($sock)} or return; - ($idle_at + $EXPTIME) > $now; + # make sure we disable reading during a long response, + # clients should not be sending us stuff and making us do more + # work while we are stream a response to them + $self->{long_cb} = [ fileno($sock), $cb, now(), @args ]; + long_step($self); # kick off! + undef; +} + +sub awaitpid { + my ($pid, @cb_args) = @_; # @cb_args = ($cb, @args), $cb may be undef + $AWAIT_PIDS->{$pid} = \@cb_args if @cb_args; + # provide synchronous API + if (defined(wantarray) || (!$in_loop && !@cb_args)) { + my $ret; +again: + $ret = waitpid($pid, 0) // -2; + if ($ret == $pid) { + my $cb_args = delete $AWAIT_PIDS->{$pid}; + @cb_args = @$cb_args if !@cb_args && $cb_args; + await_cb($pid, @cb_args); + } else { + goto again if $! == EINTR; + carp "waitpid($pid): $!"; + delete $AWAIT_PIDS->{$pid}; + } + return $ret; + } elsif ($in_loop) { # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap(); + } } 1;