]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
ds: deal better with FS-related errors IO buffers
[public-inbox.git] / lib / PublicInbox / DS.pm
index 9811405bae3600bdd5e1af843f27fc0b3f0a325b..1a1ef7d3f89b35a719d2c95751588f8d69766538 100644 (file)
@@ -21,7 +21,7 @@ use IO::Handle qw();
 use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
 use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more write_in_full);
+our @EXPORT_OK = qw(now msg_more);
 use warnings;
 use 5.010_001;
 
@@ -33,7 +33,7 @@ use fields ('sock',              # underlying socket
             );
 
 use Errno  qw(EAGAIN EINVAL);
-use Carp   qw(croak confess);
+use Carp   qw(croak confess carp);
 use File::Temp qw(tempfile);
 
 our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 };
@@ -422,8 +422,8 @@ sub close {
 sub psendfile ($$$) {
     my ($sock, $fh, $off) = @_;
 
-    sysseek($fh, $$off, SEEK_SET) or return;
-    defined(my $to_write = sysread($fh, my $buf, 16384)) or return;
+    seek($fh, $$off, SEEK_SET) or return;
+    defined(my $to_write = read($fh, my $buf, 16384)) or return;
     my $written = 0;
     while ($to_write > 0) {
         if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
@@ -473,29 +473,36 @@ next_buf:
     1; # all done
 }
 
-sub write_in_full ($$$$) {
-    my ($fh, $bref, $len, $off) = @_;
-    my $rv = 0;
-    while ($len > 0) {
-        my $w = syswrite($fh, $$bref, $len, $off);
-        return ($rv ? $rv : $w) unless $w; # undef or 0
-        $rv += $w;
-        $len -= $w;
-        $off += $w;
-    }
-    $rv
+sub do_read ($$$$) {
+    my ($self, $rbuf, $len, $off) = @_;
+    my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+    return ($r == 0 ? $self->close : $r) if defined $r;
+    # common for clients to break connections without warning,
+    # would be too noisy to log here:
+    $! == EAGAIN ? $self->watch_in1 : $self->close;
+}
+
+# drop the socket if we hit unrecoverable errors on our system which
+# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE...
+sub drop {
+    my $self = shift;
+    carp(@_);
+    $self->close;
 }
 
-sub tmpbuf ($$) {
-    my ($bref, $off) = @_;
+# n.b.: use ->write/->read for this buffer to allow compatibility with
+# PerlIO::mmap or PerlIO::scalar if needed
+sub tmpio ($$$) {
+    my ($self, $bref, $off) = @_;
     # open(my $fh, '+>>', undef) doesn't set O_APPEND
-    my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1);
-    open $fh, '+>>', $path or die "open: $!";
-    unlink $path;
-    my $to_write = bytes::length($$bref) - $off;
-    my $w = write_in_full($fh, $bref, $to_write, $off);
-    die "write_in_full ($to_write): $!" unless defined $w;
-    $w == $to_write ? $fh : die("short write $w < $to_write");
+    my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) };
+    $fh or return drop($self, "tempfile: $@");
+    open($fh, '+>>', $path) or return drop($self, "open: $!");
+    $fh->autoflush(1);
+    unlink($path) or return drop($self, "unlink: $!");
+    my $len = bytes::length($$bref) - $off;
+    $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
+    $fh
 }
 
 =head2 C<< $obj->write( $data ) >>
@@ -525,9 +532,10 @@ sub write {
         } else {
             my $last = $wbuf->[-1];
             if (ref($last) eq 'GLOB') { # append to tmp file buffer
-                write_in_full($last, $bref, bytes::length($$bref), 0);
+                $last->print($$bref) or return drop($self, "print: $!");
             } else {
-                push @$wbuf, tmpbuf($bref, 0);
+                my $tmpio = tmpio($self, $bref, 0) or return 0;
+                push @$wbuf, $tmpio;
             }
         }
         return 0;
@@ -545,7 +553,8 @@ sub write {
         } else {
             return $self->close;
         }
-        $self->{wbuf} = [ tmpbuf($bref, $written) ];
+        my $tmpio = tmpio($self, $bref, $written) or return 0;
+        $self->{wbuf} = [ $tmpio ];
         watch($self, EPOLLOUT|EPOLLONESHOT);
         return 0;
     }
@@ -562,9 +571,9 @@ sub msg_more ($$) {
         if (defined $n) {
             my $nlen = bytes::length($_[1]) - $n;
             return 1 if $nlen == 0; # all done!
-
             # queue up the unwritten substring:
-            $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
+            my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
+            $self->{wbuf} = [ $tmpio ];
             watch($self, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
@@ -583,6 +592,7 @@ sub watch ($$) {
         $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
         $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
     }
+    0;
 }
 
 sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }