use Data::Dumper qw(Dumper);
use POSIX qw(strftime);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+use URI::Escape qw(uri_escape_utf8);
use constant {
r501 => '501 command syntax error',
r221 => '221 Header follows',
my $EXPMAP; # fd -> [ idle_time, $self ]
my $EXPTIMER;
our $EXPTIME = 180; # 3 minutes
+my $WEAKEN = {}; # string(nntpd) -> nntpd
+my $WEAKTIMER;
+
+my $next_tick;
+my $nextq = [];
+sub next_tick () {
+ $next_tick = undef;
+ my $q = $nextq;
+ $nextq = [];
+ foreach my $nntp (@$q) {
+ # for request && response protocols, always finish writing
+ # before finishing reading:
+ if (my $long_cb = $nntp->{long_res}) {
+ $nntp->write($long_cb);
+ } elsif (&Danga::Socket::POLLIN & $nntp->{event_watch}) {
+ event_read($nntp);
+ }
+ }
+}
sub update_idle_time ($) {
my ($self) = @_;
defined $tmp and $EXPMAP->{$tmp} = [ now(), $self ];
}
+# reduce FD pressure by closing some "git cat-file --batch" processes
+# and unused FDs for msgmap and Xapian indices
+sub weaken_groups () {
+ $WEAKTIMER = undef;
+ foreach my $nntpd (values %$WEAKEN) {
+ $_->weaken_all foreach (@{$nntpd->{grouplist}});
+ }
+ $WEAKEN = {};
+}
+
sub expire_old () {
my $now = now();
my $exp = $EXPTIME;
$next -= $now;
$next = 0 if $next < 0;
$EXPTIMER = Danga::Socket->AddTimer($next, *expire_old);
+ weaken_groups();
} else {
$EXPTIMER = undef;
- # noop to kick outselves out of the loop so descriptors
+ # noop to kick outselves out of the loop ASAP so descriptors
# really get closed
Danga::Socket->AddTimer(0, sub {});
+
+ # grace period for reaping resources
+ $WEAKTIMER ||= Danga::Socket->AddTimer(30, *weaken_groups);
}
}
$self->{rbuf} = '';
$self->watch_read(1);
update_idle_time($self);
+ $WEAKEN->{"$nntpd"} = $nntpd;
$EXPTIMER ||= Danga::Socket->AddTimer($EXPTIME, *expire_old);
$self;
}
}
$self->{ng} or return '412 no newsgroup selected';
- $self->long_response(0, long_response_limit, sub {
+ long_response($self, 0, long_response_limit, sub {
my ($i) = @_;
my $nr = $self->{ng}->mm->id_batch($$i, sub {
my ($ary) = @_;
use Time::Local qw();
my ($hh, $mm, $ss) = unpack('A2A2A2', $time);
if (defined $gmt) {
- $gmt =~ /\A(?:UTC|GMT)\z/i or die "GM invalid: $gmt\n";
+ $gmt =~ /\A(?:UTC|GMT)\z/i or die "GM invalid: $gmt";
$gmt = 1;
}
my @now = $gmt ? gmtime : localtime;
$ts .= '..';
my $opts = { asc => 1, limit => 1000, offset => 0 };
- $self->long_response(0, long_response_limit, sub {
+ long_response($self, 0, long_response_limit, sub {
my ($i) = @_;
my $srch = $srch[0];
my $res = $srch->query($ts, $opts);
undef;
}
+sub header_append ($$$) {
+ my ($hdr, $k, $v) = @_;
+ my @v = $hdr->header($k);
+ foreach (@v) {
+ return if $v eq $_;
+ }
+ $hdr->header_set($k, @v, $v);
+}
+
+sub set_nntp_headers {
+ my ($hdr, $ng, $n, $mid) = @_;
+
+ # clobber some
+ $hdr->header_set('Newsgroups', $ng->{name});
+ $hdr->header_set('Xref', xref($ng, $n));
+ header_append($hdr, 'List-Post', "<mailto:$ng->{address}>");
+ if (my $url = $ng->{url}) {
+ $mid = uri_escape_utf8($mid);
+ header_append($hdr, 'Archived-At', "<$url$mid/>");
+ header_append($hdr, 'List-Archive', "<$url>");
+ }
+}
+
sub art_lookup ($$$) {
my ($self, $art, $set_headers) = @_;
my $ng = $self->{ng};
return $err unless $s;
my $lines;
if ($set_headers) {
- $s->header_set('Newsgroups', $ng->{name});
- $s->header_set('Xref', xref($ng, $n));
+ set_nntp_headers($s->header_obj, $ng, $n, $mid);
$lines = $s->body =~ tr!\n!\n!;
# must be last
sub cmd_article ($;$) {
my ($self, $art) = @_;
- my $r = $self->art_lookup($art, 1);
+ my $r = art_lookup($self, $art, 1);
return $r unless ref $r;
my ($n, $mid, $s) = @$r;
set_art($self, $art);
sub cmd_head ($;$) {
my ($self, $art) = @_;
- my $r = $self->art_lookup($art, 2);
+ my $r = art_lookup($self, $art, 2);
return $r unless ref $r;
my ($n, $mid, $s) = @$r;
set_art($self, $art);
sub cmd_body ($;$) {
my ($self, $art) = @_;
- my $r = $self->art_lookup($art, 0);
+ my $r = art_lookup($self, $art, 0);
return $r unless ref $r;
my ($n, $mid, $s) = @$r;
set_art($self, $art);
sub cmd_stat ($;$) {
my ($self, $art) = @_;
- my $r = $self->art_lookup($art, 0);
+ my $r = art_lookup($self, $art, 0);
return $r unless ref $r;
my ($n, $mid, undef) = @$r;
set_art($self, $art);
[ $beg, $end ];
}
-sub hdr_val ($$) {
- my ($r, $header) = @_;
- return $r->[3] if $header =~ /\A:?bytes\z/i;
- return $r->[4] if $header =~ /\A:?lines\z/i;
- $r = $r->[2]->header_obj->header($header);
- defined $r or return;
- $r =~ s/[\r\n\t]+/ /sg;
- $r;
-}
-
sub long_response ($$$$) {
my ($self, $beg, $end, $cb) = @_;
die "BUG: nested long response" if $self->{long_res};
# no recursion, schedule another call ASAP
# but only after all pending writes are done
update_idle_time($self);
- Danga::Socket->AddTimer(0, sub {
- $self->write($self->{long_res});
- });
+
+ push @$nextq, $self;
+ $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
} else { # all done!
$self->{long_res} = undef;
$self->watch_read(1);
my $mm = $self->{ng}->mm;
my ($beg, $end) = @$r;
more($self, $xhdr ? r221 : r225);
- $self->long_response($beg, $end, sub {
+ long_response($self, $beg, $end, sub {
my ($i) = @_;
my $mid = $mm->mid_for($$i);
more($self, "$$i <$mid>") if defined $mid;
my $mm = $ng->mm;
my ($beg, $end) = @$r;
more($self, $xhdr ? r221 : r225);
- $self->long_response($beg, $end, sub {
+ long_response($self, $beg, $end, sub {
my ($i) = @_;
my $mid = $mm->mid_for($$i);
more($self, "$$i ".xref($ng, $$i)) if defined $mid;
my ($beg, $end) = @$r;
more($self, $xhdr ? r221 : r225);
my $off = 0;
- $self->long_response($beg, $end, sub {
+ long_response($self, $beg, $end, sub {
my ($i) = @_;
my $res = $srch->query_xover($beg, $end, $off);
my $msgs = $res->{msgs};
my $mm = $ng->mm;
my $srch = $ng->search;
more($self, '224 Overview information follows');
- $self->long_response($beg, $end, sub {
+ long_response($self, $beg, $end, sub {
my ($i) = @_;
my $mid = $mm->mid_for($$i) or return;
my $h = search_header_for($srch, $mid, 'references');
more($self, "224 Overview information follows for $beg to $end");
my $srch = $self->{ng}->search;
my $off = 0;
- $self->long_response($beg, $end, sub {
+ long_response($self, $beg, $end, sub {
my ($i) = @_;
my $res = $srch->query_xover($beg, $end, $off);
my $msgs = $res->{msgs};
$data = substr($data, $n, $dlen - $n);
}
}
- $self->do_write($data);
+ do_write($self, $data);
}
# callbacks for Danga::Socket
my $line = $1;
my $t0 = now();
my $fd = $self->{fd};
- $r = eval { $self->process_line($line) };
+ $r = eval { process_line($self, $line) };
my $d = $self->{long_res} ?
" deferred[$fd]" : '';
out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
# and we must double-check again by the time the timer fires
# in case we really did dispatch a read event and started
# another long response.
- Danga::Socket->AddTimer(0, sub {
- if (&Danga::Socket::POLLIN & $self->{event_watch}) {
- $self->event_read;
- }
- });
+ push @$nextq, $self;
+ $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
}
$rv;
}