]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
favor procedural calls for most private functions
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index 7fe7f2f349ac242c06554e4a1ad7f95f54975f59..8740377f63a86fca1625e2ba79895d20771ea457 100644 (file)
@@ -1,5 +1,7 @@
 # Copyright (C) 2015 all contributors <meta@public-inbox.org>
 # License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt)
+#
+# Each instance of this represents a NNTP client socket
 package PublicInbox::NNTP;
 use strict;
 use warnings;
@@ -7,7 +9,7 @@ use base qw(Danga::Socket);
 use fields qw(nntpd article rbuf ng long_res);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
-use PublicInbox::GitCatFile;
+use PublicInbox::Git;
 use PublicInbox::MID qw(mid2path);
 use Email::MIME;
 use Data::Dumper qw(Dumper);
@@ -33,6 +35,48 @@ my $LIST_HEADERS = join("\r\n", @OVERVIEW,
 # LISTGROUP could get pretty bad, too...
 my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr);
 
+my $EXPMAP; # fd -> [ idle_time, $self ]
+my $EXPTIMER;
+our $EXPTIME = 180; # 3 minutes
+
+sub update_idle_time ($) {
+       my ($self) = @_;
+       my $tmp = $self->{sock} or return;
+       $tmp = fileno($tmp);
+       defined $tmp and $EXPMAP->{$tmp} = [ now(), $self ];
+}
+
+sub expire_old () {
+       my $now = now();
+       my $exp = $EXPTIME;
+       my $old = $now - $exp;
+       my $next = $now + $exp;
+       my $nr = 0;
+       my %new;
+       while (my ($fd, $v) = each %$EXPMAP) {
+               my ($idle_time, $nntp) = @$v;
+               if ($idle_time < $old) {
+                       $nntp->close; # idempotent
+               } else {
+                       my $nexp = $idle_time + $exp;
+                       $next = $nexp if ($nexp < $next);
+                       ++$nr;
+                       $new{$fd} = $v;
+               }
+       }
+       $EXPMAP = \%new;
+       if ($nr) {
+               $next -= $now;
+               $next = 0 if $next < 0;
+               $EXPTIMER = Danga::Socket->AddTimer($next, *expire_old);
+       } else {
+               $EXPTIMER = undef;
+               # noop to kick outselves out of the loop so descriptors
+               # really get closed
+               Danga::Socket->AddTimer(0, sub {});
+       }
+}
+
 sub new ($$$) {
        my ($class, $sock, $nntpd) = @_;
        my $self = fields::new($class);
@@ -42,6 +86,8 @@ sub new ($$$) {
        res($self, '201 server ready - post via email');
        $self->{rbuf} = '';
        $self->watch_read(1);
+       update_idle_time($self);
+       $EXPTIMER ||= Danga::Socket->AddTimer($EXPTIME, *expire_old);
        $self;
 }
 
@@ -70,7 +116,7 @@ sub process_line ($$) {
        my $err = $@;
        if ($err && !$self->{closed}) {
                chomp($l = Dumper(\$l));
-               err($self, "error from: $l $err");
+               err($self, 'error from: %s (%s)', $l, $err);
                $res = '503 program fault - command not performed';
        }
        return 0 unless defined $res;
@@ -166,7 +212,7 @@ sub cmd_listgroup ($;$) {
        }
 
        $self->{ng} or return '412 no newsgroup selected';
-       $self->long_response(0, long_response_limit, sub {
+       long_response($self, 0, long_response_limit, sub {
                my ($i) = @_;
                my $nr = $self->{ng}->mm->id_batch($$i, sub {
                        my ($ary) = @_;
@@ -183,7 +229,7 @@ sub parse_time ($$;$) {
        use Time::Local qw();
        my ($hh, $mm, $ss) = unpack('A2A2A2', $time);
        if (defined $gmt) {
-               $gmt =~ /\A(?:UTC|GMT)\z/i or die "GM invalid: $gmt\n";
+               $gmt =~ /\A(?:UTC|GMT)\z/i or die "GM invalid: $gmt";
                $gmt = 1;
        }
        my @now = $gmt ? gmtime : localtime;
@@ -276,7 +322,7 @@ sub cmd_newnews ($$$$;$$) {
 
        $ts .= '..';
        my $opts = { asc => 1, limit => 1000, offset => 0 };
-       $self->long_response(0, long_response_limit, sub {
+       long_response($self, 0, long_response_limit, sub {
                my ($i) = @_;
                my $srch = $srch[0];
                my $res = $srch->query($ts, $opts);
@@ -416,7 +462,7 @@ sub set_art {
 
 sub cmd_article ($;$) {
        my ($self, $art) = @_;
-       my $r = $self->art_lookup($art, 1);
+       my $r = art_lookup($self, $art, 1);
        return $r unless ref $r;
        my ($n, $mid, $s) = @$r;
        set_art($self, $art);
@@ -428,7 +474,7 @@ sub cmd_article ($;$) {
 
 sub cmd_head ($;$) {
        my ($self, $art) = @_;
-       my $r = $self->art_lookup($art, 2);
+       my $r = art_lookup($self, $art, 2);
        return $r unless ref $r;
        my ($n, $mid, $s) = @$r;
        set_art($self, $art);
@@ -439,7 +485,7 @@ sub cmd_head ($;$) {
 
 sub cmd_body ($;$) {
        my ($self, $art) = @_;
-       my $r = $self->art_lookup($art, 0);
+       my $r = art_lookup($self, $art, 0);
        return $r unless ref $r;
        my ($n, $mid, $s) = @$r;
        set_art($self, $art);
@@ -449,7 +495,7 @@ sub cmd_body ($;$) {
 
 sub cmd_stat ($;$) {
        my ($self, $art) = @_;
-       my $r = $self->art_lookup($art, 0);
+       my $r = art_lookup($self, $art, 0);
        return $r unless ref $r;
        my ($n, $mid, undef) = @$r;
        set_art($self, $art);
@@ -524,18 +570,20 @@ sub long_response ($$$$) {
 
                        if ($err) {
                                err($self,
-                                   "$err during long response[$fd] - %0.6f",
-                                       now() - $t0);
+                                   "%s during long response[$fd] - %0.6f",
+                                   $err, now() - $t0);
                        }
                        if ($self->{closed}) {
                                out($self, " deferred[$fd] aborted - %0.6f",
                                           now() - $t0);
                        } else {
+                               update_idle_time($self);
                                $self->watch_read(1);
                        }
                } elsif (!$lim || $self->{write_buf_size}) {
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
+                       update_idle_time($self);
                        Danga::Socket->AddTimer(0, sub {
                                $self->write($self->{long_res});
                        });
@@ -564,7 +612,7 @@ sub hdr_message_id ($$$) { # optimize XHDR Message-ID [range] for slrnpull.
                my $mm = $self->{ng}->mm;
                my ($beg, $end) = @$r;
                more($self, $xhdr ? r221 : r225);
-               $self->long_response($beg, $end, sub {
+               long_response($self, $beg, $end, sub {
                        my ($i) = @_;
                        my $mid = $mm->mid_for($$i);
                        more($self, "$$i <$mid>") if defined $mid;
@@ -607,7 +655,7 @@ sub hdr_xref ($$$) { # optimize XHDR Xref [range] for rtin
                my $mm = $ng->mm;
                my ($beg, $end) = @$r;
                more($self, $xhdr ? r221 : r225);
-               $self->long_response($beg, $end, sub {
+               long_response($self, $beg, $end, sub {
                        my ($i) = @_;
                        my $mid = $mm->mid_for($$i);
                        more($self, "$$i ".xref($ng, $$i)) if defined $mid;
@@ -638,7 +686,7 @@ sub hdr_searchmsg ($$$$) {
                my ($beg, $end) = @$r;
                more($self, $xhdr ? r221 : r225);
                my $off = 0;
-               $self->long_response($beg, $end, sub {
+               long_response($self, $beg, $end, sub {
                        my ($i) = @_;
                        my $res = $srch->query_xover($beg, $end, $off);
                        my $msgs = $res->{msgs};
@@ -724,7 +772,7 @@ sub cmd_xrover ($;$) {
        my $mm = $ng->mm;
        my $srch = $ng->search;
        more($self, '224 Overview information follows');
-       $self->long_response($beg, $end, sub {
+       long_response($self, $beg, $end, sub {
                my ($i) = @_;
                my $mid = $mm->mid_for($$i) or return;
                my $h = search_header_for($srch, $mid, 'references');
@@ -774,7 +822,7 @@ sub cmd_xover ($;$) {
        more($self, "224 Overview information follows for $beg to $end");
        my $srch = $self->{ng}->search;
        my $off = 0;
-       $self->long_response($beg, $end, sub {
+       long_response($self, $beg, $end, sub {
                my ($i) = @_;
                my $res = $srch->query_xover($beg, $end, $off);
                my $msgs = $res->{msgs};
@@ -848,16 +896,17 @@ sub do_more ($$) {
                        $data = substr($data, $n, $dlen - $n);
                }
        }
-       $self->do_write($data);
+       do_write($self, $data);
 }
 
-# callbacks for by Danga::Socket
+# callbacks for Danga::Socket
 
 sub event_hup { $_[0]->close }
 sub event_err { $_[0]->close }
 
 sub event_write {
        my ($self) = @_;
+       update_idle_time($self);
        # only continue watching for readability when we are done writing:
        if ($self->write(undef) == 1 && !$self->{long_res}) {
                $self->watch_read(1);
@@ -875,15 +924,16 @@ sub event_read {
                my $line = $1;
                my $t0 = now();
                my $fd = $self->{fd};
-               $r = eval { $self->process_line($line) };
+               $r = eval { process_line($self, $line) };
                my $d = $self->{long_res} ?
                        " deferred[$fd]" : '';
-               out($self, "[$fd] $line - %0.6f$d", now() - $t0);
+               out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
        }
 
        return $self->close if $r < 0;
        my $len = length($self->{rbuf});
        return $self->close if ($len >= LINE_MAX);
+       update_idle_time($self);
 }
 
 sub watch_read {
@@ -897,13 +947,14 @@ sub watch_read {
                # another long response.
                Danga::Socket->AddTimer(0, sub {
                        if (&Danga::Socket::POLLIN & $self->{event_watch}) {
-                               $self->event_read;
+                               event_read($self);
                        }
                });
        }
        $rv;
 }
 
+# for graceful shutdown in PublicInbox::Daemon:
 sub busy () {
        my ($self) = @_;
        ($self->{rbuf} ne '' || $self->{long_res} || $self->{write_buf_size});