]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ds: get rid of event_watch field
authorEric Wong <e@80x24.org>
Mon, 24 Jun 2019 02:52:22 +0000 (02:52 +0000)
committerEric Wong <e@80x24.org>
Mon, 24 Jun 2019 05:26:26 +0000 (05:26 +0000)
We don't need to keep track of that field since we always
know what events we're interested in when using one-shot
wakeups.

lib/PublicInbox/DS.pm
lib/PublicInbox/EvCleanup.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD/Async.pm
lib/PublicInbox/NNTP.pm
lib/PublicInbox/Syscall.pm

index 9c801214b150290a907733d3b9bf3e255c4a7300..f5986e55a014477c74a2f61270133fde0c90c762 100644 (file)
@@ -29,8 +29,6 @@ use PublicInbox::Syscall qw(:epoll);
 use fields ('sock',              # underlying socket
             'wbuf',              # arrayref of coderefs or GLOB refs
             'wbuf_off',  # offset into first element of wbuf to start writing at
-            'event_watch',       # bitmask of events the client is interested in
-                                 # (EPOLLIN,OUT,etc.)
             );
 
 use Errno  qw(EAGAIN EINVAL);
@@ -318,6 +316,17 @@ sub PostEventLoop {
     return $keep_running;
 }
 
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+    my ($bit, $ev) = @_;
+    if ($ev & $bit) {
+        my $fl = EV_ADD() | EV_ENABLE();
+        ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
+    } else {
+        EV_DISABLE();
+    }
+}
+
 #####################################################################
 ### PublicInbox::DS-the-object code
 #####################################################################
@@ -344,25 +353,21 @@ sub new {
     Carp::cluck("undef sock and/or fd in PublicInbox::DS->new.  sock=" . ($sock || "") . ", fd=" . ($fd || ""))
         unless $sock && $fd;
 
-    $self->{event_watch} = $ev;
-
     _InitPoller();
 
     if ($HaveEpoll) {
 retry:
         if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
             if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
-                $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE);
+                $ev &= ~EPOLLEXCLUSIVE;
                 goto retry;
             }
             die "couldn't add epoll watch for $fd: $!\n";
         }
     }
     elsif ($HaveKQueue) {
-        my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE();
-        $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f);
-        $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE();
-        $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f);
+        $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev));
+        $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev));
     }
 
     Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
@@ -454,7 +459,7 @@ next_buf:
                     }
                 } elsif ($! == EAGAIN) {
                     $self->{wbuf_off} = $off;
-                    watch_write($self, 1);
+                    watch($self, EPOLLOUT|EPOLLONESHOT);
                     return 0;
                 } else {
                     return $self->close;
@@ -467,7 +472,6 @@ next_buf:
     } # while @$wbuf
 
     delete $self->{wbuf};
-    $self->watch_write(0);
     1; # all done
 }
 
@@ -544,7 +548,7 @@ sub write {
             return $self->close;
         }
         $self->{wbuf} = [ tmpbuf($bref, $written) ];
-        watch_write($self, 1);
+        watch($self, EPOLLOUT|EPOLLONESHOT);
         return 0;
     }
 }
@@ -563,49 +567,27 @@ sub msg_more ($$) {
 
             # queue up the unwritten substring:
             $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
-            watch_write($self, 1);
+            watch($self, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
     }
     $self->write(\($_[1]));
 }
 
-sub watch_chg ($$$) {
-    my ($self, $bits, $set) = @_;
+sub watch ($$) {
+    my ($self, $ev) = @_;
     my $sock = $self->{sock} or return;
-    my $cur = $self->{event_watch};
-    my $changes = $cur;
-    if ($set) {
-        $changes |= $bits;
-    } else {
-        $changes &= ~$bits;
-    }
-    return if $changes == $cur;
     my $fd = fileno($sock);
     if ($HaveEpoll) {
-        epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and
+        epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and
             confess("EPOLL_CTL_MOD $!");
     } elsif ($HaveKQueue) {
-        my $flag = $set ? EV_ENABLE() : EV_DISABLE();
-        $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN;
-        $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT;
+        $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
+        $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
     }
-    $self->{event_watch} = $changes;
 }
 
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
-
-=cut
-sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) };
-
-=head2 C<< $obj->watch_write( $boolean ) >>
-
-Turn 'writable' event notification on or off.
-
-=cut
-sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) };
+sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
 
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
index d60ac2cc8c88d3de63288c2d51d517b796e65c1d..a9f6167dff25c9054ef83a3b81c27f76cf8d847f 100644 (file)
@@ -6,6 +6,7 @@ package PublicInbox::EvCleanup;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
 
 my $ENABLED;
 sub enabled { $ENABLED }
@@ -59,13 +60,12 @@ sub _run_later () {
 # Called by PublicInbox::DS
 sub event_step {
        my ($self) = @_;
-       $self->watch_write(0);
        _run_asap();
 }
 
 sub _asap_timer () {
        $singleton ||= once_init();
-       $singleton->watch_write(1);
+       $singleton->watch(EPOLLOUT|EPOLLONESHOT);
        1;
 }
 
index afa71ea53f3cb3ddda35d96a11117b104ead6dc8..773d77ba00b8643eb50275fc72b2de0af10f3a3a 100644 (file)
@@ -20,6 +20,7 @@ use HTTP::Date qw(time2str);
 use IO::Handle;
 require PublicInbox::EvCleanup;
 PublicInbox::DS->import(qw(msg_more write_in_full));
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 use constant {
        CHUNK_START => -1,   # [a-f0-9]+\r\n
        CHUNK_END => -2,     # \r\n
@@ -56,7 +57,7 @@ sub http_date () {
 sub new ($$$) {
        my ($class, $sock, $addr, $httpd) = @_;
        my $self = fields::new($class);
-       $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+       $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
        $self->{httpd} = $httpd;
        $self->{rbuf} = '';
        ($self->{remote_addr}, $self->{remote_port}) =
@@ -80,7 +81,8 @@ sub event_step { # called by PublicInbox::DS
                return $self->close if $r == 0;
                return rbuf_process($self);
        }
-       return if $!{EAGAIN}; # no need to call watch_read(1) again
+
+       return $self->watch_in1 if $!{EAGAIN};
 
        # common for clients to break connections without warning,
        # would be too noisy to log here:
@@ -100,7 +102,7 @@ sub rbuf_process {
                        ($r == -2 && length($self->{rbuf}) > 0x4000)) {
                return quit($self, 400);
        }
-       return $self->watch_read(1) if $r < 0; # incomplete
+       return $self->watch_in1 if $r < 0; # incomplete
        $self->{rbuf} = substr($self->{rbuf}, $r);
 
        my $len = input_prepare($self, \%env);
@@ -143,7 +145,6 @@ sub read_input ($) {
 
 sub app_dispatch {
        my ($self, $input) = @_;
-       $self->watch_read(0);
        my $env = $self->{env};
        $env->{REMOTE_ADDR} = $self->{remote_addr};
        $env->{REMOTE_PORT} = $self->{remote_port};
@@ -236,7 +237,7 @@ sub identity_wcb ($) {
 sub next_request ($) {
        my ($self) = @_;
        if ($self->{rbuf} eq '') { # wait for next request
-               $self->watch_read(1);
+               $self->watch_in1;
        } else { # avoid recursion for pipelined requests
                push @$pipelineq, $self;
                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
@@ -360,7 +361,7 @@ sub recv_err {
        return $self->close if (defined $r && $r == 0);
        if ($!{EAGAIN}) {
                $self->{input_left} = $len;
-               return;
+               return $self->watch_in1;
        }
        err($self, "error reading for input: $! ($len bytes remaining)");
        quit($self, 500);
index dae62e555957fa707f6dfa62978c5f9d932f3702..f32ef0093869a1e910ed3145e283d3b52ba8e4bc 100644 (file)
@@ -31,10 +31,12 @@ sub new {
        $self;
 }
 
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
+
 # fires after pending writes are complete:
 sub restart_read_cb ($) {
        my ($self) = @_;
-       sub { $self->watch_read(1) }
+       sub { restart_read($self) }
 }
 
 sub main_cb ($$$) {
@@ -46,16 +48,16 @@ sub main_cb ($$$) {
                        $fh->write($$bref);
                        if ($http->{sock}) { # !closed
                                if ($http->{wbuf}) {
-                                       $self->watch_read(0);
+                                       $self->watch(0);
                                        $http->write(restart_read_cb($self));
                                }
-                               # stay in watch_read, but let other clients
+                               # stay in EPOLLIN, but let other clients
                                # get some work done, too.
                                return;
                        }
                        # fall through to close below...
                } elsif (!defined $r) {
-                       return if $!{EAGAIN} || $!{EINTR};
+                       return restart_read($self) if $!{EAGAIN} || $!{EINTR};
                }
 
                # Done! Error handling will happen in $fh->close
index eb1679a76b32072a740f4959829cb0b989dcce01..98f884104bf5220bde442eb09b2e5fc7ea7c22e0 100644 (file)
@@ -24,6 +24,7 @@ use constant {
        r225 => '225 Headers follow (multi-line)',
        r430 => '430 No article with that message-id',
 };
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 
 my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
 my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
@@ -52,12 +53,6 @@ sub next_tick () {
                        # pipelined request, we bypassed socket-readiness
                        # checks to get here:
                        event_step($nntp);
-
-                       # maybe there's more pipelined data, or we'll have
-                       # to register it for socket-readiness notifications
-                       if (!$nntp->{long_res} && $nntp->{sock}) {
-                               check_read($nntp);
-                       }
                }
        }
 }
@@ -97,7 +92,7 @@ sub expire_old () {
 sub new ($$$) {
        my ($class, $sock, $nntpd) = @_;
        my $self = fields::new($class);
-       $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+       $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
        $self->{nntpd} = $nntpd;
        res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
        $self->{rbuf} = '';
@@ -624,11 +619,10 @@ sub long_response ($$) {
        # make sure we disable reading during a long response,
        # clients should not be sending us stuff and making us do more
        # work while we are stream a response to them
-       $self->watch_read(0);
        my $t0 = now();
        $self->{long_res} = sub {
                my $more = eval { $cb->() };
-               if ($@ || !$self->{sock}) {
+               if ($@ || !$self->{sock}) { # something bad happened...
                        $self->{long_res} = undef;
 
                        if ($@) {
@@ -922,10 +916,6 @@ sub do_write ($$) {
        my $done = $self->write(\($_[1]));
        return 0 unless $self->{sock};
 
-       # Do not watch for readability if we have data in the queue,
-       # instead re-enable watching for readability when we can
-       $self->watch_read(0) if (!$done || $self->{long_res});
-
        $done;
 }
 
@@ -943,7 +933,6 @@ sub event_step {
        my ($self) = @_;
 
        return unless $self->flush_write && $self->{sock};
-       return if $self->{long_res};
 
        update_idle_time($self);
        # only read more requests if we've drained the write buffer,
@@ -957,7 +946,7 @@ sub event_step {
                my $off = length($$rbuf);
                $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
                unless (defined $r) {
-                       return if $!{EAGAIN};
+                       return $self->watch_in1 if $!{EAGAIN};
                        return $self->close;
                }
                return $self->close if $r == 0;
@@ -978,6 +967,10 @@ sub event_step {
        my $len = length($$rbuf);
        return $self->close if ($len >= LINE_MAX);
        update_idle_time($self);
+
+       # maybe there's more pipelined data, or we'll have
+       # to register it for socket-readiness notifications
+       check_read($self) unless ($self->{long_res} || $self->{wbuf});
 }
 
 sub check_read {
@@ -993,7 +986,7 @@ sub check_read {
        } else {
                # no pipelined requests available, let the kernel know
                # to wake us up if there's more
-               $self->watch_read(1); # PublicInbox::DS::watch_read
+               $self->watch_in1; # PublicInbox::DS::watch_in1
        }
 }
 
index 17fd1398ac29bac3b1fcd3246a337151aa824b39..f1988e616788733025f04ca99c495ea7b31eb76f 100644 (file)
@@ -24,11 +24,11 @@ $VERSION     = "0.25";
 @EXPORT_OK   = qw(sendfile epoll_ctl epoll_create epoll_wait
                   EPOLLIN EPOLLOUT
                   EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
-                  EPOLLEXCLUSIVE);
+                  EPOLLONESHOT EPOLLEXCLUSIVE);
 %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
                              EPOLLIN EPOLLOUT
                              EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
-                             EPOLLEXCLUSIVE)],
+                             EPOLLONESHOT EPOLLEXCLUSIVE)],
                 sendfile => [qw(sendfile)],
                 );
 
@@ -38,7 +38,7 @@ use constant EPOLLOUT      => 4;
 # use constant EPOLLHUP      => 16;
 # use constant EPOLLRDBAND   => 128;
 use constant EPOLLEXCLUSIVE => (1 << 28);
-use constant EPOLLONESHOT => (1 << 30);
+use constant EPOLLONESHOT => (1 << 30);
 # use constant EPOLLET => (1 << 31);
 use constant EPOLL_CTL_ADD => 1;
 use constant EPOLL_CTL_DEL => 2;