]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
ds: remove steal_socket method
[public-inbox.git] / lib / PublicInbox / DS.pm
index 03612ce870e17ac66082545c2c0939095def3d29..5177d1f18a9ea35181494b9d0d901bec7e216ebc 100644 (file)
@@ -26,16 +26,13 @@ use warnings;
 use PublicInbox::Syscall qw(:epoll);
 
 use fields ('sock',              # underlying socket
-            'fd',                # numeric file descriptor
-            'write_buf',         # arrayref of scalars, scalarrefs, or coderefs to write
-            'write_buf_offset',  # offset into first array of write_buf to start writing at
-            'write_buf_size',    # total length of data in all write_buf items
+            'wbuf',              # arrayref of scalars, scalarrefs, or coderefs to write
+            'wbuf_off',  # offset into first element of wbuf to start writing at
             'closed',            # bool: socket is closed
             'event_watch',       # bitmask of events the client is interested in (POLLIN,OUT,etc.)
             );
 
-use Errno  qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK
-              EPIPE EAGAIN EBADF ECONNRESET ENOPROTOOPT);
+use Errno  qw(EAGAIN EINVAL);
 use Carp   qw(croak confess);
 
 use constant DebugLevel => 0;
@@ -442,16 +439,14 @@ sub new {
     my ($self, $sock, $exclusive) = @_;
     $self = fields::new($self) unless ref $self;
 
-    $self->{sock}        = $sock;
+    $self->{sock} = $sock;
     my $fd = fileno($sock);
 
     Carp::cluck("undef sock and/or fd in PublicInbox::DS->new.  sock=" . ($sock || "") . ", fd=" . ($fd || ""))
         unless $sock && $fd;
 
-    $self->{fd}          = $fd;
-    $self->{write_buf}      = [];
-    $self->{write_buf_offset} = 0;
-    $self->{write_buf_size} = 0;
+    $self->{wbuf} = [];
+    $self->{wbuf_off} = 0;
     $self->{closed} = 0;
 
     my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
@@ -464,7 +459,7 @@ sub new {
         }
 retry:
         if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
-            if ($!{EINVAL} && ($ev & $EPOLLEXCLUSIVE)) {
+            if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) {
                 $EPOLLEXCLUSIVE = 0; # old kernel
                 $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP;
                 goto retry;
@@ -492,50 +487,22 @@ retry:
 ### I N S T A N C E   M E T H O D S
 #####################################################################
 
-=head2 C<< $obj->steal_socket() >>
+=head2 C<< $obj->close >>
 
-Basically returns our socket and makes it so that we don't try to close it,
-but we do remove it from epoll handlers.  THIS CLOSES $self.  It is the same
-thing as calling close, except it gives you the socket to use.
-
-=cut
-sub steal_socket {
-    my PublicInbox::DS $self = $_[0];
-    return if $self->{closed};
-
-    # cleanup does most of the work of closing this socket
-    $self->_cleanup();
-
-    # now undef our internal sock and fd structures so we don't use them
-    my $sock = $self->{sock};
-    $self->{sock} = undef;
-    return $sock;
-}
-
-=head2 C<< $obj->close( [$reason] ) >>
-
-Close the socket. The I<reason> argument will be used in debugging messages.
+Close the socket.
 
 =cut
 sub close {
     my PublicInbox::DS $self = $_[0];
     return if $self->{closed};
 
-    # print out debugging info for this close
-    if (DebugLevel) {
-        my ($pkg, $filename, $line) = caller;
-        my $reason = $_[1] || "";
-        warn "Closing \#$self->{fd} due to $pkg/$filename/$line ($reason)\n";
-    }
-
     # this does most of the work of closing us
     $self->_cleanup();
 
     # defer closing the actual socket until the event loop is done
     # processing this round of events.  (otherwise we might reuse fds)
-    if ($self->{sock}) {
-        push @ToClose, $self->{sock};
-        $self->{sock} = undef;
+    if (my $sock = delete $self->{sock}) {
+        push @ToClose, $sock;
     }
 
     return 0;
@@ -552,15 +519,14 @@ sub _cleanup {
     # we need to flush our write buffer, as there may
     # be self-referential closures (sub { $client->close })
     # preventing the object from being destroyed
-    $self->{write_buf} = [];
+    @{$self->{wbuf}} = ();
 
     # if we're using epoll, we have to remove this from our epoll fd so we stop getting
     # notifications about it
-    if ($HaveEpoll && $self->{fd}) {
-        if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $self->{fd}, $self->{event_watch}) != 0) {
-            # dump_error prints a backtrace so we can try to figure out why this happened
-            $self->dump_error("epoll_ctl(): failure deleting fd=$self->{fd} during _cleanup(); $! (" . ($!+0) . ")");
-        }
+    if ($HaveEpoll && $self->{sock}) {
+        my $fd = fileno($self->{sock});
+        epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and
+            confess("EPOLL_CTL_DEL: $!");
     }
 
     # we explicitly don't delete from DescriptorMap here until we
@@ -571,9 +537,6 @@ sub _cleanup {
     # looked at $pob->{closed} and ignore it.  but if it's an
     # un-accounted for fd, then it (understandably) freak out a bit
     # and emit warnings, thinking their state got off.
-
-    # and finally get rid of our fd so we can't use it anywhere else
-    $self->{fd} = undef;
 }
 
 =head2 C<< $obj->sock() >>
@@ -612,12 +575,12 @@ sub write {
 
     # just queue data if there's already a wait
     my $need_queue;
+    my $wbuf = $self->{wbuf};
 
     if (defined $data) {
         $bref = ref $data ? $data : \$data;
-        if ($self->{write_buf_size}) {
-            push @{$self->{write_buf}}, $bref;
-            $self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1;
+        if (scalar @$wbuf) {
+            push @$wbuf, $bref;
             return 0;
         }
 
@@ -629,7 +592,7 @@ sub write {
 
   WRITE:
     while (1) {
-        return 1 unless $bref ||= $self->{write_buf}[0];
+        return 1 unless $bref ||= $wbuf->[0];
 
         my $len;
         eval {
@@ -638,8 +601,7 @@ sub write {
         if ($@) {
             if (UNIVERSAL::isa($bref, "CODE")) {
                 unless ($need_queue) {
-                    $self->{write_buf_size}--; # code refs are worth 1
-                    shift @{$self->{write_buf}};
+                    shift @$wbuf;
                 }
                 $bref->();
 
@@ -654,46 +616,33 @@ sub write {
             die "Write error: $@ <$bref>";
         }
 
-        my $to_write = $len - $self->{write_buf_offset};
+        my $to_write = $len - $self->{wbuf_off};
         my $written = syswrite($self->{sock}, $$bref, $to_write,
-                               $self->{write_buf_offset});
+                               $self->{wbuf_off});
 
         if (! defined $written) {
-            if ($! == EPIPE) {
-                return $self->close("EPIPE");
-            } elsif ($! == EAGAIN) {
+            if ($! == EAGAIN) {
                 # since connection has stuff to write, it should now be
                 # interested in pending writes:
                 if ($need_queue) {
-                    push @{$self->{write_buf}}, $bref;
-                    $self->{write_buf_size} += $len;
+                    push @$wbuf, $bref;
                 }
                 $self->watch_write(1);
                 return 0;
-            } elsif ($! == ECONNRESET) {
-                return $self->close("ECONNRESET");
             }
 
-            DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n");
-
-            return $self->close("write_error");
+            return $self->close;
         } elsif ($written != $to_write) {
-            DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d",
-                                               $written, $self->{fd});
             if ($need_queue) {
-                push @{$self->{write_buf}}, $bref;
-                $self->{write_buf_size} += $len;
+                push @$wbuf, $bref;
             }
             # since connection has stuff to write, it should now be
             # interested in pending writes:
-            $self->{write_buf_offset} += $written;
-            $self->{write_buf_size} -= $written;
+            $self->{wbuf_off} += $written;
             $self->on_incomplete_write;
             return 0;
         } elsif ($written == $to_write) {
-            DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)",
-                                               $written, $self->{fd}, $need_queue);
-            $self->{write_buf_offset} = 0;
+            $self->{wbuf_off} = 0;
             $self->watch_write(0);
 
             # this was our only write, so we can return immediately
@@ -702,8 +651,7 @@ sub write {
             # can't be anything else to write.
             return 1 if $need_queue;
 
-            $self->{write_buf_size} -= $written;
-            shift @{$self->{write_buf}};
+            shift @$wbuf;
             undef $bref;
             next WRITE;
         }
@@ -737,9 +685,8 @@ sub read {
     my $res = sysread($sock, $buf, $req_bytes, 0);
     DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!);
 
-    if (! $res && $! != EWOULDBLOCK) {
+    if (! $res && $! != EAGAIN) {
         # catches 0=conn closed or undef=error
-        DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd});
         return undef;
     }
 
@@ -800,16 +747,16 @@ sub watch_read {
     $event &= ~POLLIN if ! $val;
     $event |=  POLLIN if   $val;
 
+    my $fd = fileno($self->{sock});
     # If it changed, set it
     if ($event != $self->{event_watch}) {
         if ($HaveKQueue) {
-            $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(),
+            $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
                             $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
         }
         elsif ($HaveEpoll) {
-            epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event)
-                and $self->dump_error("couldn't modify epoll settings for $self->{fd} " .
-                                      "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")");
+            epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
+                confess("EPOLL_CTL_MOD: $!");
         }
         $self->{event_watch} = $event;
     }
@@ -829,17 +776,17 @@ sub watch_write {
 
     $event &= ~POLLOUT if ! $val;
     $event |=  POLLOUT if   $val;
+    my $fd = fileno($self->{sock});
 
     # If it changed, set it
     if ($event != $self->{event_watch}) {
         if ($HaveKQueue) {
-            $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(),
+            $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
                             $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
         }
         elsif ($HaveEpoll) {
-            epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event)
-                and $self->dump_error("couldn't modify epoll settings for $self->{fd} " .
-                                      "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")");
+            epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
+                    confess "EPOLL_CTL_MOD: $!";
         }
         $self->{event_watch} = $event;
     }