]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
nntp: append Archived-At and List-Archive headers
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index 8740377f63a86fca1625e2ba79895d20771ea457..3e0faaf96dd8f3f45e2f3e44696bec53aec2c771 100644 (file)
@@ -15,6 +15,7 @@ use Email::MIME;
 use Data::Dumper qw(Dumper);
 use POSIX qw(strftime);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+use URI::Escape qw(uri_escape_utf8);
 use constant {
        r501 => '501 command syntax error',
        r221 => '221 Header follows',
@@ -38,6 +39,25 @@ my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr);
 my $EXPMAP; # fd -> [ idle_time, $self ]
 my $EXPTIMER;
 our $EXPTIME = 180; # 3 minutes
+my $WEAKEN = {}; # string(nntpd) -> nntpd
+my $WEAKTIMER;
+
+my $next_tick;
+my $nextq = [];
+sub next_tick () {
+       $next_tick = undef;
+       my $q = $nextq;
+       $nextq = [];
+       foreach my $nntp (@$q) {
+               # for request && response protocols, always finish writing
+               # before finishing reading:
+               if (my $long_cb = $nntp->{long_res}) {
+                       $nntp->write($long_cb);
+               } elsif (&Danga::Socket::POLLIN & $nntp->{event_watch}) {
+                       event_read($nntp);
+               }
+       }
+}
 
 sub update_idle_time ($) {
        my ($self) = @_;
@@ -46,6 +66,16 @@ sub update_idle_time ($) {
        defined $tmp and $EXPMAP->{$tmp} = [ now(), $self ];
 }
 
+# reduce FD pressure by closing some "git cat-file --batch" processes
+# and unused FDs for msgmap and Xapian indices
+sub weaken_groups () {
+       $WEAKTIMER = undef;
+       foreach my $nntpd (values %$WEAKEN) {
+               $_->weaken_all foreach (@{$nntpd->{grouplist}});
+       }
+       $WEAKEN = {};
+}
+
 sub expire_old () {
        my $now = now();
        my $exp = $EXPTIME;
@@ -69,11 +99,15 @@ sub expire_old () {
                $next -= $now;
                $next = 0 if $next < 0;
                $EXPTIMER = Danga::Socket->AddTimer($next, *expire_old);
+               weaken_groups();
        } else {
                $EXPTIMER = undef;
-               # noop to kick outselves out of the loop so descriptors
+               # noop to kick outselves out of the loop ASAP so descriptors
                # really get closed
                Danga::Socket->AddTimer(0, sub {});
+
+               # grace period for reaping resources
+               $WEAKTIMER ||= Danga::Socket->AddTimer(30, *weaken_groups);
        }
 }
 
@@ -87,6 +121,7 @@ sub new ($$$) {
        $self->{rbuf} = '';
        $self->watch_read(1);
        update_idle_time($self);
+       $WEAKEN->{"$nntpd"} = $nntpd;
        $EXPTIMER ||= Danga::Socket->AddTimer($EXPTIME, *expire_old);
        $self;
 }
@@ -392,6 +427,29 @@ sub cmd_quit ($) {
        undef;
 }
 
+sub header_append ($$$) {
+       my ($hdr, $k, $v) = @_;
+       my @v = $hdr->header($k);
+       foreach (@v) {
+               return if $v eq $_;
+       }
+       $hdr->header_set($k, @v, $v);
+}
+
+sub set_nntp_headers {
+       my ($hdr, $ng, $n, $mid) = @_;
+
+       # clobber some
+       $hdr->header_set('Newsgroups', $ng->{name});
+       $hdr->header_set('Xref', xref($ng, $n));
+       header_append($hdr, 'List-Post', "<mailto:$ng->{address}>");
+       if (my $url = $ng->{url}) {
+               $mid = uri_escape_utf8($mid);
+               header_append($hdr, 'Archived-At', "<$url$mid/>");
+               header_append($hdr, 'List-Archive', "<$url>");
+       }
+}
+
 sub art_lookup ($$$) {
        my ($self, $art, $set_headers) = @_;
        my $ng = $self->{ng};
@@ -434,8 +492,7 @@ found:
        return $err unless $s;
        my $lines;
        if ($set_headers) {
-               $s->header_set('Newsgroups', $ng->{name});
-               $s->header_set('Xref', xref($ng, $n));
+               set_nntp_headers($s->header_obj, $ng, $n, $mid);
                $lines = $s->body =~ tr!\n!\n!;
 
                # must be last
@@ -533,16 +590,6 @@ sub get_range ($$) {
        [ $beg, $end ];
 }
 
-sub hdr_val ($$) {
-       my ($r, $header) = @_;
-       return $r->[3] if $header =~ /\A:?bytes\z/i;
-       return $r->[4] if $header =~ /\A:?lines\z/i;
-       $r = $r->[2]->header_obj->header($header);
-       defined $r or return;
-       $r =~ s/[\r\n\t]+/ /sg;
-       $r;
-}
-
 sub long_response ($$$$) {
        my ($self, $beg, $end, $cb) = @_;
        die "BUG: nested long response" if $self->{long_res};
@@ -584,9 +631,9 @@ sub long_response ($$$$) {
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
                        update_idle_time($self);
-                       Danga::Socket->AddTimer(0, sub {
-                               $self->write($self->{long_res});
-                       });
+
+                       push @$nextq, $self;
+                       $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
                } else { # all done!
                        $self->{long_res} = undef;
                        $self->watch_read(1);
@@ -945,11 +992,8 @@ sub watch_read {
                # and we must double-check again by the time the timer fires
                # in case we really did dispatch a read event and started
                # another long response.
-               Danga::Socket->AddTimer(0, sub {
-                       if (&Danga::Socket::POLLIN & $self->{event_watch}) {
-                               event_read($self);
-                       }
-               });
+               push @$nextq, $self;
+               $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
        }
        $rv;
 }