X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FPOP3.pm;h=5f992e14f323fc531e545319469647fa47eb06fd;hb=refs%2Fheads%2Fmaster;hp=8612356329fde8ac4584221b071efde1bd63dbeb;hpb=68046e0fff12c35d793a7ae9f164ac415c84cc21;p=public-inbox.git diff --git a/lib/PublicInbox/POP3.pm b/lib/PublicInbox/POP3.pm index 86123563..5f992e14 100644 --- a/lib/PublicInbox/POP3.pm +++ b/lib/PublicInbox/POP3.pm @@ -33,7 +33,6 @@ package PublicInbox::POP3; use v5.12; use parent qw(PublicInbox::DS); -use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); use PublicInbox::GitAsyncCat; use PublicInbox::DS qw(now); use Errno qw(EAGAIN); @@ -46,96 +45,20 @@ use constant { # XXX FIXME: duplicated stuff from NNTP.pm and IMAP.pm -sub err ($$;@) { - my ($self, $fmt, @args) = @_; - printf { $self->{pop3d}->{err} } $fmt."\n", @args; -} - sub out ($$;@) { my ($self, $fmt, @args) = @_; printf { $self->{pop3d}->{out} } $fmt."\n", @args; } -sub zflush {} # noop - -sub requeue_once ($) { - my ($self) = @_; - # COMPRESS users all share the same DEFLATE context. - # Flush it here to ensure clients don't see - # each other's data - $self->zflush; - - # no recursion, schedule another call ASAP, - # but only after all pending writes are done. - # autovivify wbuf: - my $new_size = push(@{$self->{wbuf}}, \&long_step); - - # wbuf may be populated by $cb, no need to rearm if so: - $self->requeue if $new_size == 1; -} - -sub long_step { - my ($self) = @_; - # wbuf is unset or empty, here; {long} may add to it - my ($fd, $cb, $t0, @args) = @{$self->{long_cb}}; - my $more = eval { $cb->($self, @args) }; - if ($@ || !$self->{sock}) { # something bad happened... - delete $self->{long_cb}; - my $elapsed = now() - $t0; - if ($@) { - err($self, - "%s during long response[$fd] - %0.6f", - $@, $elapsed); - } - out($self, " deferred[$fd] aborted - %0.6f", $elapsed); - $self->close; - } elsif ($more) { # $self->{wbuf}: - # control passed to ibx_async_cat if $more == \undef - requeue_once($self) if !ref($more); - } else { # all done! - delete $self->{long_cb}; - my $elapsed = now() - $t0; - my $fd = fileno($self->{sock}); - out($self, " deferred[$fd] done - %0.6f", $elapsed); - my $wbuf = $self->{wbuf}; # do NOT autovivify - $self->requeue unless $wbuf && @$wbuf; - } -} - -sub long_response ($$;@) { - my ($self, $cb, @args) = @_; # cb returns true if more, false if done - my $sock = $self->{sock} or return; - # make sure we disable reading during a long response, - # clients should not be sending us stuff and making us do more - # work while we are stream a response to them - $self->{long_cb} = [ fileno($sock), $cb, now(), @args ]; - long_step($self); # kick off! - undef; -} - -sub _greet ($) { +sub do_greet { my ($self) = @_; my $s = $self->{salt} = sprintf('%x.%x', int(rand(0x7fffffff)), time); $self->write("+OK POP3 server ready <$s\@public-inbox>\r\n"); } -sub new ($$$) { - my ($class, $sock, $pop3d) = @_; - my $self = bless { pop3d => $pop3d }, __PACKAGE__; - my $ev = EPOLLIN; - my $wbuf; - if ($sock->can('accept_SSL') && !$sock->accept_SSL) { - return CORE::close($sock) if $! != EAGAIN; - $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock); - $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&_greet ]; - } - $self->SUPER::new($sock, $ev | EPOLLONESHOT); - if ($wbuf) { - $self->{wbuf} = $wbuf; - } else { - _greet($self); - } - $self; +sub new { + my ($cls, $sock, $pop3d) = @_; + (bless { pop3d => $pop3d }, $cls)->greet($sock) } # POP user is $UUID1@$NEWSGROUP.$SLICE @@ -176,7 +99,7 @@ sub _login_ok ($) { $self->{uid_max} = $self->{ibx}->over(1)->max; \"+OK logged in\r\n"; } else { - \"-ERR unable to lock maildrop\r\n"; + \"-ERR [IN-USE] unable to lock maildrop\r\n"; } } @@ -202,12 +125,12 @@ sub cmd_pass { sub cmd_stls { my ($self) = @_; - my $sock = $self->{sock} or return; - return \"-ERR TLS already enabled\r\n" if $sock->can('stop_SSL'); - my $opt = $self->{pop3d}->{accept_tls} or + ($self->{sock} // return)->can('stop_SSL') and + return \"-ERR TLS already enabled\r\n"; + $self->{pop3d}->{ssl_ctx_opt} or return \"-ERR can't start TLS negotiation\r\n"; $self->write(\"+OK begin TLS negotiation now\r\n"); - $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt); + PublicInbox::TLS::start($self->{sock}, $self->{pop3d}); $self->requeue if PublicInbox::DS::accept_tls_step($self); undef; } @@ -220,29 +143,38 @@ sub _stat_cache ($) { my ($self) = @_; my ($beg, $end) = (($self->{uid_dele} // -1) + 1, $self->{uid_max}); PublicInbox::IMAP::uid_clamp($self, \$beg, \$end); - my $opt = { limit => PublicInbox::IMAP::UID_SLICE }; - my $m = $self->{ibx}->over(1)->do_get(<<'', $opt, $beg, $end); + my (@cache, $m); + my $sth = $self->{ibx}->over(1)->dbh->prepare_cached(<<'', undef, 1); SELECT num,ddd FROM over WHERE num >= ? AND num <= ? ORDER BY num ASC - [ map { ($_->{num}, $_->{bytes} + 0, $_->{blob}) } @$m ]; + $sth->execute($beg, $end); + my $tot = 0; + while (defined($m = $sth->fetchall_arrayref({}, 1000))) { + for my $x (@$m) { + PublicInbox::Over::load_from_row($x); + push(@cache, $x->{num}, $x->{bytes} + 0, $x->{blob}); + undef $x; # saves ~1.5M memory w/ 50k messages + $tot += $cache[-2]; + } + } + $self->{total_bytes} = $tot; + $self->{cache} = \@cache; } sub cmd_stat { my ($self) = @_; my $err; $err = need_txn($self) and return $err; - my $cache = $self->{cache} //= _stat_cache($self); - my $tot = 0; - for (my $i = 1; $i < scalar(@$cache); $i += 3) { $tot += $cache->[$i] } + my $cache = $self->{cache} // _stat_cache($self); my $nr = @$cache / 3 - ($self->{nr_dele} // 0); - "+OK $nr $tot\r\n"; + "+OK $nr $self->{total_bytes}\r\n"; } # for LIST and UIDL sub _list { my ($desc, $idx, $self, $msn) = @_; my $err; $err = need_txn($self) and return $err; - my $cache = $self->{cache} //= _stat_cache($self); + my $cache = $self->{cache} // _stat_cache($self); if (defined $msn) { my $base_off = ($msn - 1) * 3; my $val = $cache->[$base_off + $idx] // @@ -272,8 +204,9 @@ sub mark_dele ($$) { my $old = $self->{txn_max_uid} //= $uid; $self->{txn_max_uid} = $uid if $uid > $old; + $self->{total_bytes} -= $cache->[$base_off + 1]; $cache->[$base_off] = undef; # clobber UID - $cache->[$base_off + 1] = 0; # zero bytes (simplifies cmd_stat) + $cache->[$base_off + 1] = undef; # clobber bytes $cache->[$base_off + 2] = undef; # clobber oidhex ++$self->{nr_dele}; } @@ -300,12 +233,13 @@ sub retr_cb { # called by git->cat_async via ibx_async_cat $hdr .= "\r\n\r\n"; my @tmp = split(/^/m, $bdy); $hdr .= join('', splice(@tmp, 0, $top_nr)); + } elsif (exists $self->{expire}) { + $self->{expire} .= pack('S', $off); } $$bref =~ s/^\./../gms; $$bref .= substr($$bref, -2, 2) eq "\r\n" ? ".\r\n" : "\r\n.\r\n"; $self->msg_more("+OK message follows\r\n"); $self->write($bref); - $self->{expire} .= pack('S', $off + 1) if exists $self->{expire}; $self->requeue; } @@ -314,7 +248,7 @@ sub cmd_retr { return \"-ERR lines must be a non-negative number\r\n" if (defined($top_nr) && $top_nr !~ /\A[0-9]+\z/); my $err; $err = need_txn($self) and return $err; - my $cache = $self->{cache} //= _stat_cache($self); + my $cache = $self->{cache} // _stat_cache($self); my $off = $msn - 1; my $hex = $cache->[$off * 3 + 2] // return \"-ERR no such message\r\n"; ${ibx_async_cat($self->{ibx}, $hex, \&retr_cb, @@ -334,7 +268,7 @@ sub cmd_rset { sub cmd_dele { my ($self, $msn) = @_; my $err; $err = need_txn($self) and return $err; - $self->{cache} //= _stat_cache($self); + $self->{cache} // _stat_cache($self); $msn =~ /\A[1-9][0-9]*\z/ or return \"-ERR no such message\r\n"; mark_dele($self, $msn - 1) ? \"+OK\r\n" : \"-ERR no such message\r\n"; } @@ -342,14 +276,17 @@ sub cmd_dele { # RFC 2449 sub cmd_capa { my ($self) = @_; + my $STLS = !$self->{ibx} && !$self->{sock}->can('stop_SSL') && + $self->{pop3d}->{ssl_ctx_opt} ? "\nSTLS\r" : ''; $self->{expire} = ''; # "EXPIRE 0" allows clients to avoid DELE commands - \<SUPER::close; } +# must be called inside a state_dbh transaction with flock held +sub __cleanup_state { + my ($self, $txn_id) = @_; + my $user_id = $self->{user_id} // die 'BUG: no {user_id}'; + $self->{pop3d}->{-state_dbh}->prepare_cached(<<'')->execute($txn_id); +DELETE FROM deletes WHERE txn_id = ? AND uid_dele = -1 + + my $sth = $self->{pop3d}->{-state_dbh}->prepare_cached(<<'', undef, 1); +SELECT COUNT(*) FROM deletes WHERE user_id = ? + + $sth->execute($user_id); + my $nr = $sth->fetchrow_array; + if ($nr == 0) { + $sth = $self->{pop3d}->{-state_dbh}->prepare_cached(<<''); +DELETE FROM users WHERE user_id = ? + + $sth->execute($user_id); + } + $nr; +} + sub cmd_quit { my ($self) = @_; if (defined(my $txn_id = $self->{txn_id})) { @@ -367,23 +325,25 @@ sub cmd_quit { if (my $exp = delete $self->{expire}) { mark_dele($self, $_) for unpack('S*', $exp); } + my $keep = 1; my $dbh = $self->{pop3d}->{-state_dbh}; my $lk = $self->{pop3d}->lock_for_scope; - my $sth; $dbh->begin_work; - if (defined $self->{txn_max_uid}) { - $sth = $dbh->prepare_cached(<<''); + if (defined(my $max = $self->{txn_max_uid})) { + $dbh->prepare_cached(<<'')->execute($max, $txn_id, $max) UPDATE deletes SET uid_dele = ? WHERE txn_id = ? AND uid_dele < ? - $sth->execute($self->{txn_max_uid}, $txn_id, - $self->{txn_max_uid}); + } else { + $keep = $self->__cleanup_state($txn_id); } - $sth = $dbh->prepare_cached(<<''); + $dbh->prepare_cached(<<'')->execute(time, $user_id) if $keep; UPDATE users SET last_seen = ? WHERE user_id = ? - $sth->execute(time, $user_id); $dbh->commit; + # we MUST do txn_id F_UNLCK here inside ->lock_for_scope: + $self->{did_quit} = 1; + $self->{pop3d}->unlock_mailbox($self); } $self->write(\"+OK public-inbox POP3 server signing off\r\n"); $self->close; @@ -400,8 +360,8 @@ sub process_line ($$) { \"-ERR command not recognized\r\n"; my $err = $@; if ($err && $self->{sock}) { - chomp($l); - err($self, 'error from: %s (%s)', $l, $err); + $l =~ s/\r?\n//s; + warn("error from: $l ($err)\n"); $res = \"-ERR program fault - command not performed\r\n"; } defined($res) ? $self->write($res) : 0; @@ -410,6 +370,7 @@ sub process_line ($$) { # callback used by PublicInbox::DS for any (e)poll (in/out/hup/err) sub event_step { my ($self) = @_; + local $SIG{__WARN__} = $self->{pop3d}->{warn_cb}; return unless $self->flush_write && $self->{sock} && !$self->{long_cb}; # only read more requests if we've drained the write buffer,