lib/PublicInbox/DS.pm | 42 +++++++++++++++++++++++++----------------- diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 486af40eb9e1aaafc611f68db559e7a33f25a75a..1a1ef7d3f89b35a719d2c95751588f8d69766538 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -33,7 +33,7 @@ 'wbuf_off', # offset into first element of wbuf to start writing at ); use Errno qw(EAGAIN EINVAL); -use Carp qw(croak confess); +use Carp qw(croak confess carp); use File::Temp qw(tempfile); our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 }; @@ -482,18 +482,27 @@ # would be too noisy to log here: $! == EAGAIN ? $self->watch_in1 : $self->close; } +# drop the socket if we hit unrecoverable errors on our system which +# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE... +sub drop { + my $self = shift; + carp(@_); + $self->close; +} + # n.b.: use ->write/->read for this buffer to allow compatibility with # PerlIO::mmap or PerlIO::scalar if needed -sub tmpbuf ($$) { - my ($bref, $off) = @_; +sub tmpio ($$$) { + my ($self, $bref, $off) = @_; # open(my $fh, '+>>', undef) doesn't set O_APPEND - my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1); - open $fh, '+>>', $path or die "open: $!"; + my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) }; + $fh or return drop($self, "tempfile: $@"); + open($fh, '+>>', $path) or return drop($self, "open: $!"); $fh->autoflush(1); - unlink $path; - my $to_write = bytes::length($$bref) - $off; - $fh->write($$bref, $to_write, $off) or die "write ($to_write): $!"; - $fh; + unlink($path) or return drop($self, "unlink: $!"); + my $len = bytes::length($$bref) - $off; + $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); + $fh } =head2 C<< $obj->write( $data ) >> @@ -523,12 +532,10 @@ push @$wbuf, $bref; } else { my $last = $wbuf->[-1]; if (ref($last) eq 'GLOB') { # append to tmp file buffer - unless ($last->print($$bref)) { - warn "error buffering: $!"; - return $self->close; - } + $last->print($$bref) or return drop($self, "print: $!"); } else { - push @$wbuf, tmpbuf($bref, 0); + my $tmpio = tmpio($self, $bref, 0) or return 0; + push @$wbuf, $tmpio; } } return 0; @@ -546,7 +553,8 @@ $written = 0; } else { return $self->close; } - $self->{wbuf} = [ tmpbuf($bref, $written) ]; + my $tmpio = tmpio($self, $bref, $written) or return 0; + $self->{wbuf} = [ $tmpio ]; watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } @@ -563,9 +571,9 @@ my $n = send($sock, $_[1], MSG_MORE); if (defined $n) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! - # queue up the unwritten substring: - $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ]; + my $tmpio = tmpio($self, \($_[1]), $n) or return 0; + $self->{wbuf} = [ $tmpio ]; watch($self, EPOLLOUT|EPOLLONESHOT); return 0; }