X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=cea25d90e37516c354e94410148f8299fa64cb15;hb=eb9c415ba4421cecb5157967c843dc7f8720e916;hp=856884bbb9130e7dff9065e05619e98b385d1634;hpb=ede8cc1c664e332cfa44bd22c36a31aac1a5fb13;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 856884bb..cea25d90 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -3,15 +3,15 @@ # # This license differs from the rest of public-inbox # -# This is a fork of the (for now) unmaintained Danga::Socket 1.61. -# Unused features will be removed, and updates will be made to take -# advantage of newer kernels. +# This is a fork of the unmaintained Danga::Socket (1.61) with +# significant changes. See Documentation/technical/ds.txt in our +# source for details. # -# API changes to diverge from Danga::Socket will happen to better -# accomodate new features and improve scalability. Do not expect -# this to be a stable API like Danga::Socket. -# Bugs encountered (and likely fixed) are reported to -# bug-Danga-Socket@rt.cpan.org and visible at: +# Do not expect this to be a stable API like Danga::Socket, +# but it will evolve to suite our needs and to take advantage of +# newer Linux and *BSD features. +# Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org, +# fixed in Danga::Socket 1.62 and visible at: # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket package PublicInbox::DS; use strict; @@ -35,9 +35,8 @@ use fields ('sock', # underlying socket 'wbuf_off', # offset into first element of wbuf to start writing at ); -use Errno qw(EAGAIN EINVAL); -use Carp qw(croak confess carp); -require File::Spec; +use Errno qw(EAGAIN EINVAL); +use Carp qw(confess carp); my $nextq; # queue for next_tick my $WaitPids; # list of [ pid, callback, callback_arg ] @@ -102,20 +101,18 @@ sub SetLoopTimeout { return $LoopTimeout = $_[1] + 0; } -=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >> +=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >> Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. -Returns a timer object which you can call C<< $timer->cancel >> on if you need to. - =cut -sub AddTimer { - my ($class, $secs, $coderef) = @_; +sub add_timer ($$) { + my ($secs, $coderef) = @_; my $fire_time = now() + $secs; - my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; + my $timer = [$fire_time, $coderef]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -248,14 +245,14 @@ sub reap_pids { } if (@$WaitPids) { # we may not be donea, and we may miss our - $reap_timer = AddTimer(undef, 1, \&reap_pids); + $reap_timer = add_timer(1, \&reap_pids); } } # reentrant SIGCHLD handler (since reap_pids is not reentrant) sub enqueue_reap ($) { push @$nextq, \&reap_pids }; -sub running () { ($SIG{CHLD} // '') eq \&enqueue_reap } +sub in_loop () { $in_loop } sub EpollEventLoop { local $in_loop = 1; @@ -340,9 +337,6 @@ sub new { $self->{sock} = $sock; my $fd = fileno($sock); - Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) - unless $sock && $fd; - _InitPoller(); if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { @@ -352,11 +346,10 @@ sub new { } die "couldn't add epoll watch for $fd: $!\n"; } - Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") - if $DescriptorMap{$fd}; + confess("DescriptorMap{$fd} defined ($DescriptorMap{$fd})") + if defined($DescriptorMap{$fd}); $DescriptorMap{$fd} = $self; - return $self; } @@ -576,15 +569,18 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub msg_more ($$) { my $self = $_[0]; my $sock = $self->{sock} or return 1; + my $wbuf = $self->{wbuf}; - if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') { + if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) && + ref($sock) ne 'IO::Socket::SSL') { my $n = send($sock, $_[1], MSG_MORE); if (defined $n) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! # queue up the unwritten substring: my $tmpio = tmpio($self, \($_[1]), $n) or return 0; - $self->{wbuf} = [ $tmpio ]; + $self->{wbuf} //= $wbuf //= []; + push @$wbuf, $tmpio; epwait($sock, EPOLLOUT|EPOLLONESHOT); return 0; } @@ -657,7 +653,7 @@ sub _run_later () { sub later ($) { my ($cb) = @_; push @$later_queue, $cb; - $later_timer //= AddTimer(undef, 60, \&_run_later); + $later_timer //= add_timer(60, \&_run_later); } sub expire_old () { @@ -694,12 +690,6 @@ sub not_idle_long { $exp_at > $now; } -package PublicInbox::DS::Timer; -# [$abs_float_firetime, $coderef]; -sub cancel { - $_[0][1] = undef; -} - 1; =head1 AUTHORS (Danga::Socket)