]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
bundle Danga::Socket and Sys::Syscall
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index fa890cb2cb274c7d9cb9afb1a461792a03a1fc2e..f756e92c64b39793c682dce067c9af4473d782c3 100644 (file)
@@ -5,7 +5,7 @@
 package PublicInbox::NNTP;
 use strict;
 use warnings;
-use base qw(Danga::Socket);
+use base qw(PublicInbox::DS);
 use fields qw(nntpd article rbuf ng long_res);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
@@ -23,12 +23,11 @@ use constant {
        r224 => '224 Overview information follows (multi-line)',
        r225 => '225 Headers follow (multi-line)',
        r430 => '430 No article with that message-id',
-       long_response_limit => 0xffffffff,
 };
 
 sub now () { clock_gettime(CLOCK_MONOTONIC) };
 
-my @OVERVIEW = qw(Subject From Date Message-ID References);
+my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
 my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
 my $LIST_HEADERS = join("\r\n", @OVERVIEW,
                        qw(:bytes :lines Xref To Cc)) . "\r\n";
@@ -51,8 +50,16 @@ sub next_tick () {
                # before finishing reading:
                if (my $long_cb = $nntp->{long_res}) {
                        $nntp->write($long_cb);
-               } elsif (&Danga::Socket::POLLIN & $nntp->{event_watch}) {
+               } else {
+                       # pipelined request, we bypassed socket-readiness
+                       # checks to get here:
                        event_read($nntp);
+
+                       # maybe there's more pipelined data, or we'll have
+                       # to register it for socket-readiness notifications
+                       if (!$nntp->{long_res} && !$nntp->{closed}) {
+                               check_read($nntp);
+                       }
                }
        }
 }
@@ -94,7 +101,7 @@ sub new ($$$) {
        my $self = fields::new($class);
        $self->SUPER::new($sock);
        $self->{nntpd} = $nntpd;
-       res($self, '201 server ready - post via email');
+       res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
        $self->{rbuf} = '';
        $self->watch_read(1);
        update_idle_time($self);
@@ -115,6 +122,7 @@ sub args_ok ($$) {
 sub process_line ($$) {
        my ($self, $l) = @_;
        my ($req, @args) = split(/\s+/, $l);
+       return unless defined($req);
        $req = lc($req);
        $req = eval {
                no strict 'refs';
@@ -409,7 +417,7 @@ sub header_append ($$$) {
 
 sub xref ($$$$) {
        my ($self, $ng, $n, $mid) = @_;
-       my $ret = "$ng->{domain} $ng->{newsgroup}:$n";
+       my $ret = $self->{nntpd}->{servername} . " $ng->{newsgroup}:$n";
 
        # num_for is pretty cheap and sometimes we'll lookup the existence
        # of an article without getting even the OVER info.  In other words,
@@ -506,6 +514,7 @@ sub set_art {
 sub _header ($) {
        my $hdr = $_[0]->header_obj->as_string;
        utf8::encode($hdr);
+       $hdr =~ s/(?<!\r)\n/\r\n/sg;
        $hdr
 }
 
@@ -608,7 +617,7 @@ sub long_response ($$) {
                                           now() - $t0);
                        } else {
                                update_idle_time($self);
-                               $self->watch_read(1);
+                               check_read($self);
                        }
                } elsif ($more) { # $self->{write_buf_size}:
                        # no recursion, schedule another call ASAP
@@ -619,7 +628,7 @@ sub long_response ($$) {
                        $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
                } else { # all done!
                        $self->{long_res} = undef;
-                       $self->watch_read(1);
+                       check_read($self);
                        res($self, '.');
                        out($self, " deferred[$fd] done - %0.6f", now() - $t0);
                }
@@ -725,7 +734,7 @@ sub hdr_searchmsg ($$$$) {
                        my $nr = scalar @$msgs or return;
                        my $tmp = '';
                        foreach my $s (@$msgs) {
-                               $tmp .= $s->num . ' ' . $s->$field . "\r\n";
+                               $tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
                        }
                        utf8::encode($tmp);
                        do_more($self, $tmp);
@@ -811,8 +820,8 @@ sub cmd_xrover ($;$) {
        });
 }
 
-sub over_line ($$) {
-       my ($num, $smsg) = @_;
+sub over_line ($$$$) {
+       my ($self, $ng, $num, $smsg) = @_;
        # n.b. field access and procedural calls can be
        # 10%-15% faster than OO method calls:
        my $s = join("\t", $num,
@@ -822,7 +831,8 @@ sub over_line ($$) {
                "<$smsg->{mid}>",
                $smsg->{references},
                $smsg->{bytes},
-               $smsg->{lines});
+               $smsg->{lines},
+               "Xref: " . xref($self, $ng, $num, $smsg->{mid}));
        utf8::encode($s);
        $s
 }
@@ -838,7 +848,7 @@ sub cmd_over ($;$) {
                # Only set article number column if it's the current group
                my $self_ng = $self->{ng};
                $n = 0 if (!$self_ng || $self_ng ne $ng);
-               more($self, over_line($n, $smsg));
+               more($self, over_line($self, $ng, $n, $smsg));
                '.';
        } else {
                cmd_xover($self, $range);
@@ -860,7 +870,7 @@ sub cmd_xover ($;$) {
 
                # OVERVIEW.FMT
                more($self, join("\r\n", map {
-                       over_line($_->{num}, $_);
+                       over_line($self, $self->{ng}, $_->{num}, $_);
                        } @$msgs));
                $cur = $msgs->[-1]->{num} + 1;
        });
@@ -892,7 +902,7 @@ sub more ($$) {
 sub do_write ($$) {
        my ($self, $data) = @_;
        my $done = $self->write($data);
-       return if $self->{closed};
+       return if $self->{closed};
 
        # Do not watch for readability if we have data in the queue,
        # instead re-enable watching for readability when we can
@@ -926,7 +936,7 @@ sub do_more ($$) {
        do_write($self, $data);
 }
 
-# callbacks for Danga::Socket
+# callbacks for PublicInbox::DS
 
 sub event_hup { $_[0]->close }
 sub event_err { $_[0]->close }
@@ -943,11 +953,13 @@ sub event_write {
 sub event_read {
        my ($self) = @_;
        use constant LINE_MAX => 512; # RFC 977 section 2.3
-       my $r = 1;
 
-       my $buf = $self->read(LINE_MAX) or return $self->close;
-       $self->{rbuf} .= $$buf;
-       while ($r > 0 && $self->{rbuf} =~ s/\A\s*([^\r\n]+)\r?\n//) {
+       if (index($self->{rbuf}, "\n") < 0) {
+               my $buf = $self->read(LINE_MAX) or return $self->close;
+               $self->{rbuf} .= $$buf;
+       }
+       my $r = 1;
+       while ($r > 0 && $self->{rbuf} =~ s/\A\s*([^\r\n]*)\r?\n//) {
                my $line = $1;
                return $self->close if $line =~ /[[:cntrl:]]/s;
                my $t0 = now();
@@ -964,10 +976,9 @@ sub event_read {
        update_idle_time($self);
 }
 
-sub watch_read {
-       my ($self, $bool) = @_;
-       my $rv = $self->SUPER::watch_read($bool);
-       if ($bool && $self->{rbuf} ne '') {
+sub check_read {
+       my ($self) = @_;
+       if (index($self->{rbuf}, "\n") >= 0) {
                # Force another read if there is a pipelined request.
                # We don't know if the socket has anything for us to read,
                # and we must double-check again by the time the timer fires
@@ -975,8 +986,11 @@ sub watch_read {
                # another long response.
                push @$nextq, $self;
                $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+       } else {
+               # no pipelined requests available, let the kernel know
+               # to wake us up if there's more
+               $self->watch_read(1); # PublicInbox::DS::watch_read
        }
-       $rv;
 }
 
 sub not_idle_long ($$) {