]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/POP3.pm
No ext_urls
[public-inbox.git] / lib / PublicInbox / POP3.pm
index 741b5e58a90949854fdcf4331d2e3ad8e89bcfdf..5f992e14f323fc531e545319469647fa47eb06fd 100644 (file)
@@ -45,55 +45,11 @@ use constant {
 
 # XXX FIXME: duplicated stuff from NNTP.pm and IMAP.pm
 
-sub err ($$;@) {
-       my ($self, $fmt, @args) = @_;
-       printf { $self->{pop3d}->{err} } $fmt."\n", @args;
-}
-
 sub out ($$;@) {
        my ($self, $fmt, @args) = @_;
        printf { $self->{pop3d}->{out} } $fmt."\n", @args;
 }
 
-sub long_step {
-       my ($self) = @_;
-       # wbuf is unset or empty, here; {long} may add to it
-       my ($fd, $cb, $t0, @args) = @{$self->{long_cb}};
-       my $more = eval { $cb->($self, @args) };
-       if ($@ || !$self->{sock}) { # something bad happened...
-               delete $self->{long_cb};
-               my $elapsed = now() - $t0;
-               if ($@) {
-                       err($self,
-                           "%s during long response[$fd] - %0.6f",
-                           $@, $elapsed);
-               }
-               out($self, " deferred[$fd] aborted - %0.6f", $elapsed);
-               $self->close;
-       } elsif ($more) { # $self->{wbuf}:
-               # control passed to ibx_async_cat if $more == \undef
-               requeue_once($self) if !ref($more);
-       } else { # all done!
-               delete $self->{long_cb};
-               my $elapsed = now() - $t0;
-               my $fd = fileno($self->{sock});
-               out($self, " deferred[$fd] done - %0.6f", $elapsed);
-               my $wbuf = $self->{wbuf}; # do NOT autovivify
-               $self->requeue unless $wbuf && @$wbuf;
-       }
-}
-
-sub long_response ($$;@) {
-       my ($self, $cb, @args) = @_; # cb returns true if more, false if done
-       my $sock = $self->{sock} or return;
-       # make sure we disable reading during a long response,
-       # clients should not be sending us stuff and making us do more
-       # work while we are stream a response to them
-       $self->{long_cb} = [ fileno($sock), $cb, now(), @args ];
-       long_step($self); # kick off!
-       undef;
-}
-
 sub do_greet {
        my ($self) = @_;
        my $s = $self->{salt} = sprintf('%x.%x', int(rand(0x7fffffff)), time);
@@ -169,12 +125,12 @@ sub cmd_pass {
 
 sub cmd_stls {
        my ($self) = @_;
-       my $sock = $self->{sock} or return;
-       return \"-ERR TLS already enabled\r\n" if $sock->can('stop_SSL');
-       my $opt = $self->{pop3d}->{accept_tls} or
+       ($self->{sock} // return)->can('stop_SSL') and
+               return \"-ERR TLS already enabled\r\n";
+       $self->{pop3d}->{ssl_ctx_opt} or
                return \"-ERR can't start TLS negotiation\r\n";
        $self->write(\"+OK begin TLS negotiation now\r\n");
-       $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
+       PublicInbox::TLS::start($self->{sock}, $self->{pop3d});
        $self->requeue if PublicInbox::DS::accept_tls_step($self);
        undef;
 }
@@ -187,29 +143,38 @@ sub _stat_cache ($) {
        my ($self) = @_;
        my ($beg, $end) = (($self->{uid_dele} // -1) + 1, $self->{uid_max});
        PublicInbox::IMAP::uid_clamp($self, \$beg, \$end);
-       my $opt = { limit => PublicInbox::IMAP::UID_SLICE };
-       my $m = $self->{ibx}->over(1)->do_get(<<'', $opt, $beg, $end);
+       my (@cache, $m);
+       my $sth = $self->{ibx}->over(1)->dbh->prepare_cached(<<'', undef, 1);
 SELECT num,ddd FROM over WHERE num >= ? AND num <= ?
 ORDER BY num ASC
 
-       [ map { ($_->{num}, $_->{bytes} + 0, $_->{blob}) } @$m ];
+       $sth->execute($beg, $end);
+       my $tot = 0;
+       while (defined($m = $sth->fetchall_arrayref({}, 1000))) {
+               for my $x (@$m) {
+                       PublicInbox::Over::load_from_row($x);
+                       push(@cache, $x->{num}, $x->{bytes} + 0, $x->{blob});
+                       undef $x; # saves ~1.5M memory w/ 50k messages
+                       $tot += $cache[-2];
+               }
+       }
+       $self->{total_bytes} = $tot;
+       $self->{cache} = \@cache;
 }
 
 sub cmd_stat {
        my ($self) = @_;
        my $err; $err = need_txn($self) and return $err;
-       my $cache = $self->{cache} //= _stat_cache($self);
-       my $tot = 0;
-       for (my $i = 1; $i < scalar(@$cache); $i += 3) { $tot += $cache->[$i] }
+       my $cache = $self->{cache} // _stat_cache($self);
        my $nr = @$cache / 3 - ($self->{nr_dele} // 0);
-       "+OK $nr $tot\r\n";
+       "+OK $nr $self->{total_bytes}\r\n";
 }
 
 # for LIST and UIDL
 sub _list {
        my ($desc, $idx, $self, $msn) = @_;
        my $err; $err = need_txn($self) and return $err;
-       my $cache = $self->{cache} //= _stat_cache($self);
+       my $cache = $self->{cache} // _stat_cache($self);
        if (defined $msn) {
                my $base_off = ($msn - 1) * 3;
                my $val = $cache->[$base_off + $idx] //
@@ -239,8 +204,9 @@ sub mark_dele ($$) {
        my $old = $self->{txn_max_uid} //= $uid;
        $self->{txn_max_uid} = $uid if $uid > $old;
 
+       $self->{total_bytes} -= $cache->[$base_off + 1];
        $cache->[$base_off] = undef; # clobber UID
-       $cache->[$base_off + 1] = 0; # zero bytes (simplifies cmd_stat)
+       $cache->[$base_off + 1] = undef; # clobber bytes
        $cache->[$base_off + 2] = undef; # clobber oidhex
        ++$self->{nr_dele};
 }
@@ -268,7 +234,7 @@ sub retr_cb { # called by git->cat_async via ibx_async_cat
                my @tmp = split(/^/m, $bdy);
                $hdr .= join('', splice(@tmp, 0, $top_nr));
        } elsif (exists $self->{expire}) {
-               $self->{expire} .= pack('S', $off + 1);
+               $self->{expire} .= pack('S', $off);
        }
        $$bref =~ s/^\./../gms;
        $$bref .= substr($$bref, -2, 2) eq "\r\n" ? ".\r\n" : "\r\n.\r\n";
@@ -282,7 +248,7 @@ sub cmd_retr {
        return \"-ERR lines must be a non-negative number\r\n" if
                        (defined($top_nr) && $top_nr !~ /\A[0-9]+\z/);
        my $err; $err = need_txn($self) and return $err;
-       my $cache = $self->{cache} //= _stat_cache($self);
+       my $cache = $self->{cache} // _stat_cache($self);
        my $off = $msn - 1;
        my $hex = $cache->[$off * 3 + 2] // return \"-ERR no such message\r\n";
        ${ibx_async_cat($self->{ibx}, $hex, \&retr_cb,
@@ -302,7 +268,7 @@ sub cmd_rset {
 sub cmd_dele {
        my ($self, $msn) = @_;
        my $err; $err = need_txn($self) and return $err;
-       $self->{cache} //= _stat_cache($self);
+       $self->{cache} // _stat_cache($self);
        $msn =~ /\A[1-9][0-9]*\z/ or return \"-ERR no such message\r\n";
        mark_dele($self, $msn - 1) ? \"+OK\r\n" : \"-ERR no such message\r\n";
 }
@@ -311,7 +277,7 @@ sub cmd_dele {
 sub cmd_capa {
        my ($self) = @_;
        my $STLS = !$self->{ibx} && !$self->{sock}->can('stop_SSL') &&
-                       $self->{pop3d}->{accept_tls} ? "\nSTLS\r" : '';
+                       $self->{pop3d}->{ssl_ctx_opt} ? "\nSTLS\r" : '';
        $self->{expire} = ''; # "EXPIRE 0" allows clients to avoid DELE commands
        <<EOM;
 +OK Capability list follows\r
@@ -331,6 +297,27 @@ sub close {
        $self->SUPER::close;
 }
 
+# must be called inside a state_dbh transaction with flock held
+sub __cleanup_state {
+       my ($self, $txn_id) = @_;
+       my $user_id = $self->{user_id} // die 'BUG: no {user_id}';
+       $self->{pop3d}->{-state_dbh}->prepare_cached(<<'')->execute($txn_id);
+DELETE FROM deletes WHERE txn_id = ? AND uid_dele = -1
+
+       my $sth = $self->{pop3d}->{-state_dbh}->prepare_cached(<<'', undef, 1);
+SELECT COUNT(*) FROM deletes WHERE user_id = ?
+
+       $sth->execute($user_id);
+       my $nr = $sth->fetchrow_array;
+       if ($nr == 0) {
+               $sth = $self->{pop3d}->{-state_dbh}->prepare_cached(<<'');
+DELETE FROM users WHERE user_id = ?
+
+               $sth->execute($user_id);
+       }
+       $nr;
+}
+
 sub cmd_quit {
        my ($self) = @_;
        if (defined(my $txn_id = $self->{txn_id})) {
@@ -338,23 +325,25 @@ sub cmd_quit {
                if (my $exp = delete $self->{expire}) {
                        mark_dele($self, $_) for unpack('S*', $exp);
                }
+               my $keep = 1;
                my $dbh = $self->{pop3d}->{-state_dbh};
                my $lk = $self->{pop3d}->lock_for_scope;
-               my $sth;
                $dbh->begin_work;
 
-               if (defined $self->{txn_max_uid}) {
-                       $sth = $dbh->prepare_cached(<<'');
+               if (defined(my $max = $self->{txn_max_uid})) {
+                       $dbh->prepare_cached(<<'')->execute($max, $txn_id, $max)
 UPDATE deletes SET uid_dele = ? WHERE txn_id = ? AND uid_dele < ?
 
-                       $sth->execute($self->{txn_max_uid}, $txn_id,
-                                       $self->{txn_max_uid});
+               } else {
+                       $keep = $self->__cleanup_state($txn_id);
                }
-               $sth = $dbh->prepare_cached(<<'');
+               $dbh->prepare_cached(<<'')->execute(time, $user_id) if $keep;
 UPDATE users SET last_seen = ? WHERE user_id = ?
 
-               $sth->execute(time, $user_id);
                $dbh->commit;
+               # we MUST do txn_id F_UNLCK here inside ->lock_for_scope:
+               $self->{did_quit} = 1;
+               $self->{pop3d}->unlock_mailbox($self);
        }
        $self->write(\"+OK public-inbox POP3 server signing off\r\n");
        $self->close;
@@ -371,8 +360,8 @@ sub process_line ($$) {
                \"-ERR command not recognized\r\n";
        my $err = $@;
        if ($err && $self->{sock}) {
-               chomp($l);
-               err($self, 'error from: %s (%s)', $l, $err);
+               $l =~ s/\r?\n//s;
+               warn("error from: $l ($err)\n");
                $res = \"-ERR program fault - command not performed\r\n";
        }
        defined($res) ? $self->write($res) : 0;
@@ -381,6 +370,7 @@ sub process_line ($$) {
 # callback used by PublicInbox::DS for any (e)poll (in/out/hup/err)
 sub event_step {
        my ($self) = @_;
+       local $SIG{__WARN__} = $self->{pop3d}->{warn_cb};
        return unless $self->flush_write && $self->{sock} && !$self->{long_cb};
 
        # only read more requests if we've drained the write buffer,