require PublicInbox::EvCleanup;
use Email::Simple;
use POSIX qw(strftime);
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+PublicInbox::DS->import(qw(now msg_more));
use Digest::SHA qw(sha1_hex);
use Time::Local qw(timegm timelocal);
use constant {
r430 => '430 No article with that message-id',
};
-sub now () { clock_gettime(CLOCK_MONOTONIC) };
-
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
my $LIST_HEADERS = join("\r\n", @OVERVIEW,
} else {
# pipelined request, we bypassed socket-readiness
# checks to get here:
- event_read($nntp);
+ event_step($nntp);
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- if (!$nntp->{long_res} && !$nntp->{closed}) {
+ if (!$nntp->{long_res} && $nntp->{sock}) {
check_read($nntp);
}
}
sub update_idle_time ($) {
my ($self) = @_;
- my $fd = $self->{fd};
- defined $fd and $EXPMAP->{$fd} = [ now(), $self ];
+ my $sock = $self->{sock} or return;
+ $EXPMAP->{fileno($sock)} = [ now(), $self ];
}
sub expire_old () {
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock);
+ $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
$self->{nntpd} = $nntpd;
res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
$self->{rbuf} = '';
- $self->watch_read(1);
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
my $res = eval { $req->($self, @args) };
my $err = $@;
- if ($err && !$self->{closed}) {
+ if ($err && $self->{sock}) {
local $/ = "\n";
chomp($l);
err($self, 'error from: %s (%s)', $l, $err);
sub list_overview_fmt ($) {
my ($self) = @_;
- do_more($self, $OVERVIEW_FMT);
+ msg_more($self, $OVERVIEW_FMT);
}
sub list_headers ($;$) {
my ($self) = @_;
- do_more($self, $LIST_HEADERS);
+ msg_more($self, $LIST_HEADERS);
}
sub list_active ($;$) {
# reason. We'll fake the shortest one possible.
$hdr->header_set('Path', 'y');
+ # leafnode (and maybe other NNTP clients) have trouble dealing
+ # with v2 messages which have multiple Message-IDs (either due
+ # to our own content-based dedupe or buggy git-send-email versions).
+ my @mids = $hdr->header('Message-ID');
+ if (scalar(@mids) > 1) {
+ my $mid0 = "<$mid>";
+ $hdr->header_set('Message-ID', $mid0);
+ my @alt = $hdr->header('X-Alt-Message-ID');
+ my %seen = map { $_ => 1 } (@alt, $mid0);
+ foreach my $m (@mids) {
+ next if $seen{$m}++;
+ push @alt, $m;
+ }
+ $hdr->header_set('X-Alt-Message-ID', @alt);
+ }
+
# clobber some
my $xref = xref($self, $ng, $n, $mid);
$hdr->header_set('Xref', $xref);
$s->body_set('');
$body =~ s/^\./../smg;
$body =~ s/(?<!\r)\n/\r\n/sg;
- do_more($self, $body);
- do_more($self, "\r\n") unless $body =~ /\r\n\z/s;
+ msg_more($self, $body);
+ msg_more($self, "\r\n") unless $body =~ /\r\n\z/s;
'.'
}
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "220 $n <$mid> article retrieved - head and body follow");
- do_more($self, _header($s));
- do_more($self, "\r\n");
+ msg_more($self, _header($s));
+ msg_more($self, "\r\n");
simple_body_write($self, $s);
}
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "221 $n <$mid> article retrieved - head follows");
- do_more($self, _header($s));
+ msg_more($self, _header($s));
'.'
}
my ($self, $cb) = @_;
die "BUG: nested long response" if $self->{long_res};
- my $fd = $self->{fd};
+ my $fd = fileno($self->{sock});
defined $fd or return;
# make sure we disable reading during a long response,
# clients should not be sending us stuff and making us do more
my $t0 = now();
$self->{long_res} = sub {
my $more = eval { $cb->() };
- if ($@ || $self->{closed}) {
+ if ($@ || !$self->{sock}) {
$self->{long_res} = undef;
if ($@) {
"%s during long response[$fd] - %0.6f",
$@, now() - $t0);
}
- if ($self->{closed}) {
- out($self, " deferred[$fd] aborted - %0.6f",
- now() - $t0);
- } else {
+ if ($self->{sock}) {
update_idle_time($self);
check_read($self);
+ } else {
+ out($self, " deferred[$fd] aborted - %0.6f",
+ now() - $t0);
}
- } elsif ($more) { # $self->{write_buf_size}:
+ } elsif ($more) { # $self->{wbuf}:
# no recursion, schedule another call ASAP
# but only after all pending writes are done
update_idle_time($self);
$tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
}
utf8::encode($tmp);
- do_more($self, $tmp);
+ msg_more($self, $tmp);
$cur = $msgs->[-1]->{num} + 1;
});
}
'223 '.join(' ', @paths);
}
-sub res ($$) {
- my ($self, $line) = @_;
- do_write($self, $line . "\r\n");
-}
+sub res ($$) { do_write($_[0], $_[1] . "\r\n") }
-sub more ($$) {
- my ($self, $line) = @_;
- do_more($self, $line . "\r\n");
-}
+sub more ($$) { msg_more($_[0], $_[1] . "\r\n") }
sub do_write ($$) {
- my ($self, $data) = @_;
- my $done = $self->write($data);
- return 0 if $self->{closed};
+ my $self = $_[0];
+ my $done = $self->write(\($_[1]));
+ return 0 unless $self->{sock};
# Do not watch for readability if we have data in the queue,
# instead re-enable watching for readability when we can
printf { $self->{nntpd}->{out} } $fmt."\n", @args;
}
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub do_more ($$) {
- my ($self, $data) = @_;
- if (MSG_MORE && !$self->{write_buf_size}) {
- my $n = send($self->{sock}, $data, MSG_MORE);
- if (defined $n) {
- my $dlen = length($data);
- return 1 if $n == $dlen; # all done!
- $data = substr($data, $n, $dlen - $n);
- }
- }
- do_write($self, $data);
-}
-
-# callbacks for PublicInbox::DS
+sub event_step {
+ my ($self) = @_;
-sub event_hup { $_[0]->close }
-sub event_err { $_[0]->close }
+ return unless $self->flush_write && $self->{sock};
+ return if $self->{long_res};
-sub event_write {
- my ($self) = @_;
update_idle_time($self);
- # only continue watching for readability when we are done writing:
- if ($self->write(undef) == 1 && !$self->{long_res}) {
- $self->watch_read(1);
- }
-}
+ # only read more requests if we've drained the write buffer,
+ # otherwise we can be buffering infinitely w/o backpressure
-sub event_read {
- my ($self) = @_;
use constant LINE_MAX => 512; # RFC 977 section 2.3
-
- if (index($self->{rbuf}, "\n") < 0) {
- my $buf = $self->read(LINE_MAX) or return $self->close;
- $self->{rbuf} .= $$buf;
+ my $rbuf = \($self->{rbuf});
+ my $r;
+
+ if (index($$rbuf, "\n") < 0) {
+ my $off = length($$rbuf);
+ $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
+ unless (defined $r) {
+ return if $!{EAGAIN};
+ return $self->close;
+ }
+ return $self->close if $r == 0;
}
- my $r = 1;
- while ($r > 0 && $self->{rbuf} =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
+ $r = 1;
+ while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
my $line = $1;
return $self->close if $line =~ /[[:cntrl:]]/s;
my $t0 = now();
- my $fd = $self->{fd};
+ my $fd = fileno($self->{sock});
$r = eval { process_line($self, $line) };
my $d = $self->{long_res} ?
" deferred[$fd]" : '';
}
return $self->close if $r < 0;
- my $len = length($self->{rbuf});
+ my $len = length($$rbuf);
return $self->close if ($len >= LINE_MAX);
update_idle_time($self);
}
sub not_idle_long ($$) {
my ($self, $now) = @_;
- defined(my $fd = $self->{fd}) or return;
- my $ary = $EXPMAP->{$fd} or return;
+ my $sock = $self->{sock} or return;
+ my $ary = $EXPMAP->{fileno($sock)} or return;
my $exp_at = $ary->[0] + $EXPTIME;
$exp_at > $now;
}
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
- ($self->{rbuf} ne '' || $self->{long_res} || $self->{write_buf_size} ||
- not_idle_long($self, $now));
+ ($self->{rbuf} ne '' || $self->{long_res} ||
+ $self->{wbuf} || not_idle_long($self, $now));
}
1;