]> Sergey Matveev's repositories - public-inbox.git/commitdiff
nntp: support slow blob retrievals
authorEric Wong <e@yhbt.net>
Tue, 16 Jun 2020 22:31:22 +0000 (22:31 +0000)
committerEric Wong <e@yhbt.net>
Sun, 21 Jun 2020 00:43:28 +0000 (00:43 +0000)
Having `git cat-file' as a separate process naturally lends
itself to asynchronous dispatch.  Our event loop for -nntpd no
longer blocks on slow git storage.

Pipelining in -imapd was tricky and bugs were exposed by
mbsync(1).  Update t/nntpd.t to support pipelining ARTICLE
requests to ensure we don't have the same problems -imapd
did during development.

lib/PublicInbox/NNTP.pm
t/nntp.t
t/nntpd.t

index 80dd8614fe801b26f57ba79825e3d938475f333e..6df19f322b6e03aaf91ba76f61a524d047be7604 100644 (file)
@@ -13,6 +13,7 @@ use POSIX qw(strftime);
 use PublicInbox::DS qw(now);
 use Digest::SHA qw(sha1_hex);
 use Time::Local qw(timegm timelocal);
+use PublicInbox::GitAsyncCat;
 use constant {
        LINE_MAX => 512, # RFC 977 section 2.3
        r501 => '501 command syntax error',
@@ -408,8 +409,9 @@ sub xref ($$$$) {
        $ret;
 }
 
-sub set_nntp_headers ($$$$$) {
-       my ($self, $hdr, $ng, $n, $mid) = @_;
+sub set_nntp_headers ($$$) {
+       my ($self, $hdr, $smsg) = @_;
+       my ($mid) = $smsg->{mid};
 
        # why? leafnode requires a Path: header for some inexplicable
        # reason.  We'll fake the shortest one possible.
@@ -429,7 +431,8 @@ sub set_nntp_headers ($$$$$) {
        }
 
        # clobber some
-       my $xref = xref($self, $ng, $n, $mid);
+       my $ng = $self->{ng};
+       my $xref = xref($self, $ng, $smsg->{num}, $mid);
        $hdr->header_set('Xref', $xref);
        $xref =~ s/:[0-9]+//g;
        $hdr->header_set('Newsgroups', (split(/ /, $xref, 2))[1]);
@@ -441,8 +444,8 @@ sub set_nntp_headers ($$$$$) {
        }
 }
 
-sub art_lookup ($$$) {
-       my ($self, $art, $set_headers) = @_;
+sub art_lookup ($$) {
+       my ($self, $art) = @_;
        my $ng = $self->{ng};
        my ($n, $mid);
        my $err;
@@ -478,13 +481,7 @@ find_mid:
        }
 found:
        my $smsg = $ng->over->get_art($n) or return $err;
-       my $msg = $ng->msg_by_smsg($smsg) or return $err;
-
-       # PublicInbox::Eml->new will modify $msg in-place, so what's
-       # left is the body and we won't need to call ->body(), later
-       my $hdr = PublicInbox::Eml->new($msg)->header_obj;
-       set_nntp_headers($self, $hdr, $ng, $n, $mid) if $set_headers;
-       [ $n, $mid, $msg, $hdr ];
+       $smsg;
 }
 
 sub msg_body_write ($$) {
@@ -495,7 +492,6 @@ sub msg_body_write ($$) {
        $$msg =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
        $$msg .= "\r\n" unless $$msg =~ /\r\n\z/s;
        $self->msg_more($$msg);
-       '.'
 }
 
 sub set_art {
@@ -504,55 +500,88 @@ sub set_art {
 }
 
 sub msg_hdr_write ($$$) {
-       my ($self, $hdr, $body_follows) = @_;
-       $hdr = $hdr->as_string;
+       my ($self, $eml, $smsg) = @_;
+       set_nntp_headers($self, $eml, $smsg);
+
+       my $hdr = $eml->{hdr} // \(my $x = '');
        # fixup old bug from import (pre-a0c07cba0e5d8b6a)
-       $hdr =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
-       utf8::encode($hdr);
-       $hdr =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
+       $$hdr =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+       $$hdr =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
 
        # for leafnode compatibility, we need to ensure Message-ID headers
        # are only a single line.
-       $hdr =~ s/^(Message-ID:)[ \t]*\r\n[ \t]+([^\r]+)\r\n/$1 $2\r\n/igsm;
-       $hdr .= "\r\n" if $body_follows;
-       $self->msg_more($hdr);
+       $$hdr =~ s/^(Message-ID:)[ \t]*\r\n[ \t]+([^\r]+)\r\n/$1 $2\r\n/igsm;
+       $self->msg_more($$hdr);
+}
+
+sub blob_cb { # called by git->cat_async via git_async_cat
+       my ($bref, $oid, $type, $size, $smsg) = @_;
+       my $self = $smsg->{nntp};
+       my $code = $smsg->{nntp_code} // 220;
+       if (!defined($oid)) {
+               # it's possible to have TOCTOU if an admin runs
+               # public-inbox-(edit|purge), just move onto the next message
+               return $self->requeue;
+       } elsif ($smsg->{blob} ne $oid) {
+               $self->close;
+               die "BUG: $smsg->{blob} != $oid";
+       }
+       my $r = "$code $smsg->{num} <$smsg->{mid}> article retrieved - ";
+       my $eml = PublicInbox::Eml->new($bref);
+       if ($code == 220) {
+               more($self, $r .= 'head and body follow');
+               msg_hdr_write($self, $eml, $smsg);
+               $self->msg_more("\r\n");
+               msg_body_write($self, $bref);
+       } elsif ($code == 221) {
+               more($self, $r .= 'head follows');
+               msg_hdr_write($self, $eml, $smsg);
+       } elsif ($code == 222) {
+               more($self, $r .= 'body follows');
+               msg_body_write($self, $bref);
+       } else {
+               $self->close;
+               die "BUG: bad code: $r";
+       }
+       $self->write(\".\r\n"); # flushes (includes ->zflush)
+       $self->requeue;
 }
 
 sub cmd_article ($;$) {
        my ($self, $art) = @_;
-       my $r = art_lookup($self, $art, 1);
-       return $r unless ref $r;
-       my ($n, $mid, $msg, $hdr) = @$r;
+       my $smsg = art_lookup($self, $art);
+       return $smsg unless ref $smsg;
        set_art($self, $art);
-       more($self, "220 $n <$mid> article retrieved - head and body follow");
-       msg_hdr_write($self, $hdr, 1);
-       msg_body_write($self, $msg);
+       $smsg->{nntp} = $self;
+       git_async_cat($self->{ng}->git, $smsg->{blob}, \&blob_cb, $smsg);
+       undef;
 }
 
 sub cmd_head ($;$) {
        my ($self, $art) = @_;
-       my $r = art_lookup($self, $art, 2);
-       return $r unless ref $r;
-       my ($n, $mid, undef, $hdr) = @$r;
+       my $smsg = art_lookup($self, $art);
+       return $smsg unless ref $smsg;
        set_art($self, $art);
-       more($self, "221 $n <$mid> article retrieved - head follows");
-       msg_hdr_write($self, $hdr, 0);
-       '.'
+       $smsg->{nntp} = $self;
+       $smsg->{nntp_code} = 221;
+       git_async_cat($self->{ng}->git, $smsg->{blob}, \&blob_cb, $smsg);
+       undef;
 }
 
 sub cmd_body ($;$) {
        my ($self, $art) = @_;
-       my $r = art_lookup($self, $art, 0);
-       return $r unless ref $r;
-       my ($n, $mid, $msg) = @$r;
+       my $smsg = art_lookup($self, $art);
+       return $smsg unless ref $smsg;
        set_art($self, $art);
-       more($self, "222 $n <$mid> article retrieved - body follows");
-       msg_body_write($self, $msg);
+       $smsg->{nntp} = $self;
+       $smsg->{nntp_code} = 222;
+       git_async_cat($self->{ng}->git, $smsg->{blob}, \&blob_cb, $smsg);
+       undef;
 }
 
 sub cmd_stat ($;$) {
        my ($self, $art) = @_;
-       my $r = art_lookup($self, $art, 0);
+       my $r = art_lookup($self, $art);
        return $r unless ref $r;
        my ($n, $mid) = @$r;
        set_art($self, $art);
index 2a9f3a4f6ebdd2e69d58cffe4e3ccb4e6bfb019b..1db896cf46b5a6376929e8de6c501e28d8d0d6f0 100644 (file)
--- a/t/nntp.t
+++ b/t/nntp.t
@@ -109,9 +109,12 @@ use_ok 'PublicInbox::Inbox';
        my $mid = 'a@b';
        my $mime = PublicInbox::Eml->new("Message-ID: <$mid>\r\n\r\n");
        my $hdr = $mime->header_obj;
-       my $mock_self = { nntpd => { grouplist => [], 
-                                    servername => 'example.com' } };
-       PublicInbox::NNTP::set_nntp_headers($mock_self, $hdr, $ng, 1, $mid);
+       my $mock_self = {
+               nntpd => { grouplist => [], servername => 'example.com' },
+               ng => $ng,
+       };
+       my $smsg = { num => 1, mid => $mid };
+       PublicInbox::NNTP::set_nntp_headers($mock_self, $hdr, $smsg);
        is_deeply([ $mime->header('Message-ID') ], [ "<$mid>" ],
                'Message-ID unchanged');
        is_deeply([ $mime->header('Archived-At') ], [ "<${u}a\@b/>" ],
@@ -126,7 +129,8 @@ use_ok 'PublicInbox::Inbox';
                'Xref: set');
 
        $ng->{-base_url} = 'http://mirror.example.com/m/';
-       PublicInbox::NNTP::set_nntp_headers($mock_self, $hdr, $ng, 2, $mid);
+       $smsg->{num} = 2;
+       PublicInbox::NNTP::set_nntp_headers($mock_self, $hdr, $smsg);
        is_deeply([ $mime->header('Message-ID') ], [ "<$mid>" ],
                'Message-ID unchanged');
        is_deeply([ $mime->header('Archived-At') ],
index b24720eb9dd0c323158995b0a4c3532e1829ba6d..c681b01c3d9d4eb40036ae9f8e98543c2f1bd07b 100644 (file)
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -12,6 +12,8 @@ use IO::Socket;
 use Socket qw(IPPROTO_TCP TCP_NODELAY);
 use Net::NNTP;
 use Sys::Hostname;
+use POSIX qw(_exit);
+use Digest::SHA;
 
 # FIXME: make easier to test both versions
 my $version = $ENV{PI_TEST_VERSION} || 1;
@@ -287,21 +289,37 @@ Date: Fri, 02 Oct 1993 00:00:00 +0000
        # pipelined requests:
        {
                my $nreq = 90;
+               my $nart = 2;
                syswrite($s, "GROUP $group\r\n");
                my $res = <$s>;
                my $rdr = fork;
                if ($rdr == 0) {
-                       use POSIX qw(_exit);
                        for (1..$nreq) {
                                <$s> =~ /\A224 / or _exit(1);
                                <$s> =~ /\A1/ or _exit(2);
                                <$s> eq ".\r\n" or _exit(3);
                        }
+                       my %sums;
+                       for (1..$nart) {
+                               <$s> =~ /\A220 / or _exit(4);
+                               my $dig = Digest::SHA->new(1);
+                               while (my $l = <$s>) {
+                                       last if $l eq ".\r\n";
+                                       $dig->add($l);
+                               }
+                               $dig = $dig->hexdigest;
+                               $sums{$dig}++;
+                       }
+                       if ($nart) {
+                               scalar(keys(%sums)) == 1 or _exit(5);
+                               (values(%sums))[0] == $nart or _exit(6);
+                       }
                        _exit(0);
                }
                for (1..$nreq) {
                        syswrite($s, "XOVER 1\r\n");
                }
+               syswrite($s, "ARTICLE 1\r\n" x $nart);
                is($rdr, waitpid($rdr, 0), 'reader done');
                is($? >> 8, 0, 'no errors');
        }