]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
Merge remote-tracking branch 'origin/ds'
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index cbd4ecf155a22ae2aae5ef45e36cc3a80e25fef9..fa412f8c6c8bc893b84ed80cb668cc45522a6fb9 100644 (file)
@@ -5,7 +5,7 @@
 package PublicInbox::NNTP;
 use strict;
 use warnings;
-use base qw(Danga::Socket);
+use base qw(PublicInbox::DS);
 use fields qw(nntpd article rbuf ng long_res);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
@@ -23,12 +23,11 @@ use constant {
        r224 => '224 Overview information follows (multi-line)',
        r225 => '225 Headers follow (multi-line)',
        r430 => '430 No article with that message-id',
-       long_response_limit => 0xffffffff,
 };
 
 sub now () { clock_gettime(CLOCK_MONOTONIC) };
 
-my @OVERVIEW = qw(Subject From Date Message-ID References);
+my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
 my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
 my $LIST_HEADERS = join("\r\n", @OVERVIEW,
                        qw(:bytes :lines Xref To Cc)) . "\r\n";
@@ -51,15 +50,24 @@ sub next_tick () {
                # before finishing reading:
                if (my $long_cb = $nntp->{long_res}) {
                        $nntp->write($long_cb);
-               } elsif (&Danga::Socket::POLLIN & $nntp->{event_watch}) {
+               } else {
+                       # pipelined request, we bypassed socket-readiness
+                       # checks to get here:
                        event_read($nntp);
+
+                       # maybe there's more pipelined data, or we'll have
+                       # to register it for socket-readiness notifications
+                       if (!$nntp->{long_res} && !$nntp->{closed}) {
+                               check_read($nntp);
+                       }
                }
        }
 }
 
 sub update_idle_time ($) {
        my ($self) = @_;
-       my $fd = $self->{fd};
+        my $sock = $self->{sock} or return;
+       my $fd = fileno($sock);
        defined $fd and $EXPMAP->{$fd} = [ now(), $self ];
 }
 
@@ -114,8 +122,8 @@ sub args_ok ($$) {
 # returns 1 if we can continue, 0 if not due to buffered writes or disconnect
 sub process_line ($$) {
        my ($self, $l) = @_;
-       my ($req, @args) = split(/\s+/, $l);
-       return unless defined($req);
+       my ($req, @args) = split(/[ \t]/, $l);
+       return 1 unless defined($req); # skip blank line
        $req = lc($req);
        $req = eval {
                no strict 'refs';
@@ -319,27 +327,27 @@ sub cmd_newnews ($$$$;$$) {
        my ($keep, $skip) = split('!', $newsgroups, 2);
        ngpat2re($keep);
        ngpat2re($skip);
-       my @srch;
+       my @over;
        foreach my $ng (@{$self->{nntpd}->{grouplist}}) {
                $ng->{newsgroup} =~ $keep or next;
                $ng->{newsgroup} =~ $skip and next;
-               my $srch = $ng->search or next;
-               push @srch, $srch;
+               my $over = $ng->over or next;
+               push @over, $over;
        };
-       return '.' unless @srch;
+       return '.' unless @over;
 
        my $prev = 0;
        long_response($self, sub {
-               my $srch = $srch[0];
-               my $msgs = $srch->query_ts($ts, $prev);
+               my $over = $over[0];
+               my $msgs = $over->query_ts($ts, $prev);
                if (scalar @$msgs) {
                        more($self, '<' .
                                join(">\r\n<", map { $_->mid } @$msgs ).
                                '>');
                        $prev = $msgs->[-1]->{num};
                } else {
-                       shift @srch;
-                       if (@srch) { # continue onto next newsgroup
+                       shift @over;
+                       if (@over) { # continue onto next newsgroup
                                $prev = 0;
                                return 1;
                        } else { # break out of the long response.
@@ -427,10 +435,30 @@ sub xref ($$$$) {
 sub set_nntp_headers ($$$$$) {
        my ($self, $hdr, $ng, $n, $mid) = @_;
 
+       # why? leafnode requires a Path: header for some inexplicable
+       # reason.  We'll fake the shortest one possible.
+       $hdr->header_set('Path', 'y');
+
+       # leafnode (and maybe other NNTP clients) have trouble dealing
+       # with v2 messages which have multiple Message-IDs (either due
+       # to our own content-based dedupe or buggy git-send-email versions).
+       my @mids = $hdr->header('Message-ID');
+       if (scalar(@mids) > 1) {
+               my $mid0 = "<$mid>";
+               $hdr->header_set('Message-ID', $mid0);
+               my @alt = $hdr->header('X-Alt-Message-ID');
+               my %seen = map { $_ => 1 } (@alt, $mid0);
+               foreach my $m (@mids) {
+                       next if $seen{$m}++;
+                       push @alt, $m;
+               }
+               $hdr->header_set('X-Alt-Message-ID', @alt);
+       }
+
        # clobber some
        my $xref = xref($self, $ng, $n, $mid);
        $hdr->header_set('Xref', $xref);
-       $xref =~ s/:\d+//g;
+       $xref =~ s/:[0-9]+//g;
        $hdr->header_set('Newsgroups', (split(/ /, $xref, 2))[1]);
        header_append($hdr, 'List-Post', "<mailto:$ng->{-primary_address}>");
        if (my $url = $ng->base_url) {
@@ -446,7 +474,7 @@ sub art_lookup ($$$) {
        my ($n, $mid);
        my $err;
        if (defined $art) {
-               if ($art =~ /\A\d+\z/o) {
+               if ($art =~ /\A[0-9]+\z/) {
                        $err = '423 no such article number in this group';
                        $n = int($art);
                        goto find_mid;
@@ -476,7 +504,7 @@ find_mid:
                defined $mid or return $err;
        }
 found:
-       my $smsg = $ng->search->{over_ro}->get_art($n) or return $err;
+       my $smsg = $ng->over->get_art($n) or return $err;
        my $msg = $ng->msg_by_smsg($smsg) or return $err;
        my $s = Email::Simple->new($msg);
        if ($set_headers) {
@@ -501,12 +529,20 @@ sub simple_body_write ($$) {
 
 sub set_art {
        my ($self, $art) = @_;
-       $self->{article} = $art if defined $art && $art =~ /\A\d+\z/;
+       $self->{article} = $art if defined $art && $art =~ /\A[0-9]+\z/;
 }
 
 sub _header ($) {
        my $hdr = $_[0]->header_obj->as_string;
        utf8::encode($hdr);
+       $hdr =~ s/(?<!\r)\n/\r\n/sg;
+
+       # for leafnode compatibility, we need to ensure Message-ID headers
+       # are only a single line.  We can't subclass Email::Simple::Header
+       # and override _default_fold_at in here, either; since that won't
+       # affect messages already in the archive.
+       $hdr =~ s/^(Message-ID:)[ \t]*\r\n[ \t]+([^\r]+)\r\n/$1 $2\r\n/igsm;
+
        $hdr
 }
 
@@ -568,11 +604,11 @@ sub get_range ($$) {
        defined $range or return '420 No article(s) selected';
        my ($beg, $end);
        my ($min, $max) = $ng->mm->minmax;
-       if ($range =~ /\A(\d+)\z/) {
+       if ($range =~ /\A([0-9]+)\z/) {
                $beg = $end = $1;
-       } elsif ($range =~ /\A(\d+)-\z/) {
+       } elsif ($range =~ /\A([0-9]+)-\z/) {
                ($beg, $end) = ($1, $max);
-       } elsif ($range =~ /\A(\d+)-(\d+)\z/) {
+       } elsif ($range =~ /\A([0-9]+)-([0-9]+)\z/) {
                ($beg, $end) = ($1, $2);
        } else {
                return r501;
@@ -587,7 +623,7 @@ sub long_response ($$) {
        my ($self, $cb) = @_;
        die "BUG: nested long response" if $self->{long_res};
 
-       my $fd = $self->{fd};
+       my $fd = fileno($self->{sock});
        defined $fd or return;
        # make sure we disable reading during a long response,
        # clients should not be sending us stuff and making us do more
@@ -609,9 +645,9 @@ sub long_response ($$) {
                                           now() - $t0);
                        } else {
                                update_idle_time($self);
-                               $self->watch_read(1);
+                               check_read($self);
                        }
-               } elsif ($more) { # $self->{write_buf_size}:
+               } elsif ($more) { # scalar @{$self->{wbuf}}:
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
                        update_idle_time($self);
@@ -620,7 +656,7 @@ sub long_response ($$) {
                        $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
                } else { # all done!
                        $self->{long_res} = undef;
-                       $self->watch_read(1);
+                       check_read($self);
                        res($self, '.');
                        out($self, " deferred[$fd] done - %0.6f", now() - $t0);
                }
@@ -698,9 +734,9 @@ sub hdr_xref ($$$) { # optimize XHDR Xref [range] for rtin
        }
 }
 
-sub search_header_for {
-       my ($srch, $num, $field) = @_;
-       my $smsg = $srch->{over_ro}->get_art($num) or return;
+sub over_header_for {
+       my ($over, $num, $field) = @_;
+       my $smsg = $over->get_art($num) or return;
        return PublicInbox::SearchMsg::date($smsg) if $field eq 'date';
        $smsg->{$field};
 }
@@ -710,11 +746,11 @@ sub hdr_searchmsg ($$$$) {
        if (defined $range && $range =~ /\A<(.+)>\z/) { # Message-ID
                my ($ng, $n) = mid_lookup($self, $1);
                return r430 unless defined $n;
-               my $v = search_header_for($ng->search, $n, $field);
+               my $v = over_header_for($ng->over, $n, $field);
                hdr_mid_response($self, $xhdr, $ng, $n, $range, $v);
        } else { # numeric range
                $range = $self->{article} unless defined $range;
-               my $srch = $self->{ng}->search;
+               my $over = $self->{ng}->over;
                my $mm = $self->{ng}->mm;
                my $r = get_range($self, $range);
                return $r unless ref $r;
@@ -722,7 +758,7 @@ sub hdr_searchmsg ($$$$) {
                more($self, $xhdr ? r221 : r225);
                my $cur = $beg;
                long_response($self, sub {
-                       my $msgs = $srch->query_xover($cur, $end);
+                       my $msgs = $over->query_xover($cur, $end);
                        my $nr = scalar @$msgs or return;
                        my $tmp = '';
                        foreach my $s (@$msgs) {
@@ -802,18 +838,18 @@ sub cmd_xrover ($;$) {
        return $r unless ref $r;
        my ($beg, $end) = @$r;
        my $mm = $ng->mm;
-       my $srch = $ng->search;
+       my $over = $ng->over;
        more($self, '224 Overview information follows');
 
        long_response($self, sub {
-               my $h = search_header_for($srch, $beg, 'references');
+               my $h = over_header_for($over, $beg, 'references');
                more($self, "$beg $h") if defined($h);
                $beg++ < $end;
        });
 }
 
-sub over_line ($$) {
-       my ($num, $smsg) = @_;
+sub over_line ($$$$) {
+       my ($self, $ng, $num, $smsg) = @_;
        # n.b. field access and procedural calls can be
        # 10%-15% faster than OO method calls:
        my $s = join("\t", $num,
@@ -823,7 +859,8 @@ sub over_line ($$) {
                "<$smsg->{mid}>",
                $smsg->{references},
                $smsg->{bytes},
-               $smsg->{lines});
+               $smsg->{lines},
+               "Xref: " . xref($self, $ng, $num, $smsg->{mid}));
        utf8::encode($s);
        $s
 }
@@ -833,13 +870,13 @@ sub cmd_over ($;$) {
        if ($range && $range =~ /\A<(.+)>\z/) {
                my ($ng, $n) = mid_lookup($self, $1);
                defined $n or return r430;
-               my $smsg = $ng->search->{over_ro}->get_art($n) or return r430;
+               my $smsg = $ng->over->get_art($n) or return r430;
                more($self, '224 Overview information follows (multi-line)');
 
                # Only set article number column if it's the current group
                my $self_ng = $self->{ng};
                $n = 0 if (!$self_ng || $self_ng ne $ng);
-               more($self, over_line($n, $smsg));
+               more($self, over_line($self, $ng, $n, $smsg));
                '.';
        } else {
                cmd_xover($self, $range);
@@ -853,15 +890,15 @@ sub cmd_xover ($;$) {
        return $r unless ref $r;
        my ($beg, $end) = @$r;
        more($self, "224 Overview information follows for $beg to $end");
-       my $srch = $self->{ng}->search;
+       my $over = $self->{ng}->over;
        my $cur = $beg;
        long_response($self, sub {
-               my $msgs = $srch->query_xover($cur, $end);
+               my $msgs = $over->query_xover($cur, $end);
                my $nr = scalar @$msgs or return;
 
                # OVERVIEW.FMT
                more($self, join("\r\n", map {
-                       over_line($_->{num}, $_);
+                       over_line($self, $self->{ng}, $_->{num}, $_);
                        } @$msgs));
                $cur = $msgs->[-1]->{num} + 1;
        });
@@ -893,7 +930,7 @@ sub more ($$) {
 sub do_write ($$) {
        my ($self, $data) = @_;
        my $done = $self->write($data);
-       return if $self->{closed};
+       return if $self->{closed};
 
        # Do not watch for readability if we have data in the queue,
        # instead re-enable watching for readability when we can
@@ -916,7 +953,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
 
 sub do_more ($$) {
        my ($self, $data) = @_;
-       if (MSG_MORE && !$self->{write_buf_size}) {
+       if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
                my $n = send($self->{sock}, $data, MSG_MORE);
                if (defined $n) {
                        my $dlen = length($data);
@@ -927,11 +964,6 @@ sub do_more ($$) {
        do_write($self, $data);
 }
 
-# callbacks for Danga::Socket
-
-sub event_hup { $_[0]->close }
-sub event_err { $_[0]->close }
-
 sub event_write {
        my ($self) = @_;
        update_idle_time($self);
@@ -944,17 +976,24 @@ sub event_write {
 sub event_read {
        my ($self) = @_;
        use constant LINE_MAX => 512; # RFC 977 section 2.3
-
-       if (index($self->{rbuf}, "\n") < 0) {
-               my $buf = $self->read(LINE_MAX) or return $self->close;
-               $self->{rbuf} .= $$buf;
+       my $rbuf = \($self->{rbuf});
+       my $r;
+
+       if (index($$rbuf, "\n") < 0) {
+               my $off = length($$rbuf);
+               $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
+               unless (defined $r) {
+                       return if $!{EAGAIN};
+                       return $self->close;
+               }
+               return $self->close if $r == 0;
        }
-       my $r = 1;
-       while ($r > 0 && $self->{rbuf} =~ s/\A\s*([^\r\n]*)\r?\n//) {
+       $r = 1;
+       while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
                my $line = $1;
                return $self->close if $line =~ /[[:cntrl:]]/s;
                my $t0 = now();
-               my $fd = $self->{fd};
+               my $fd = fileno($self->{sock});
                $r = eval { process_line($self, $line) };
                my $d = $self->{long_res} ?
                        " deferred[$fd]" : '';
@@ -962,15 +1001,14 @@ sub event_read {
        }
 
        return $self->close if $r < 0;
-       my $len = length($self->{rbuf});
+       my $len = length($$rbuf);
        return $self->close if ($len >= LINE_MAX);
        update_idle_time($self);
 }
 
-sub watch_read {
-       my ($self, $bool) = @_;
-       my $rv = $self->SUPER::watch_read($bool);
-       if ($bool && index($self->{rbuf}, "\n") >= 0) {
+sub check_read {
+       my ($self) = @_;
+       if (index($self->{rbuf}, "\n") >= 0) {
                # Force another read if there is a pipelined request.
                # We don't know if the socket has anything for us to read,
                # and we must double-check again by the time the timer fires
@@ -978,13 +1016,17 @@ sub watch_read {
                # another long response.
                push @$nextq, $self;
                $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+       } else {
+               # no pipelined requests available, let the kernel know
+               # to wake us up if there's more
+               $self->watch_read(1); # PublicInbox::DS::watch_read
        }
-       $rv;
 }
 
 sub not_idle_long ($$) {
        my ($self, $now) = @_;
-       defined(my $fd = $self->{fd}) or return;
+        my $sock = $self->{sock} or return;
+       defined(my $fd = fileno($sock)) or return;
        my $ary = $EXPMAP->{$fd} or return;
        my $exp_at = $ary->[0] + $EXPTIME;
        $exp_at > $now;
@@ -993,8 +1035,8 @@ sub not_idle_long ($$) {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy {
        my ($self, $now) = @_;
-       ($self->{rbuf} ne '' || $self->{long_res} || $self->{write_buf_size} ||
-        not_idle_long($self, $now));
+       ($self->{rbuf} ne '' || $self->{long_res} ||
+               scalar(@{$self->{wbuf}}) || not_idle_long($self, $now));
 }
 
 1;