lib/PublicInbox/DS.pm | 44 +++++++++++++++++++++++++++++++++++++++++--- lib/PublicInbox/IMAP.pm | 47 +++-------------------------------------------- lib/PublicInbox/NNTP.pm | 67 +++++++++++------------------------------------------ lib/PublicInbox/POP3.pm | 39 --------------------------------------- diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index f0181b5461cfc4111574af2991d3522a57aba174..fee31e3d132ab675ff11a9e16c9e623ddce8d1d3 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -650,18 +650,56 @@ } sub zflush {} # overridden by NNTPdeflate and IMAPdeflate +sub long_response_done {} # overridden by Net::NNTP + +sub long_step { + my ($self) = @_; + # wbuf is unset or empty, here; {long} may add to it + my ($fd, $cb, $t0, @args) = @{$self->{long_cb}}; + my $more = eval { $cb->($self, @args) }; + if ($@ || !$self->{sock}) { # something bad happened... + delete $self->{long_cb}; + my $elapsed = now() - $t0; + $@ and $self->err("%s during long response[$fd] - %0.6f", + $@, $elapsed); + $self->out(" deferred[$fd] aborted - %0.6f", $elapsed); + $self->close; + } elsif ($more) { # $self->{wbuf}: + # control passed to ibx_async_cat if $more == \undef + requeue_once($self) if !ref($more); + } else { # all done! + delete $self->{long_cb}; + $self->long_response_done; + my $elapsed = now() - $t0; + my $fd = fileno($self->{sock}); + $self->out(" deferred[$fd] done - %0.6f", $elapsed); + my $wbuf = $self->{wbuf}; # do NOT autovivify + requeue($self) unless $wbuf && @$wbuf; + } +} + sub requeue_once { my ($self) = @_; # COMPRESS users all share the same DEFLATE context. - # Flush it here to ensure clients don't see - # each other's data + # Flush it here to ensure clients don't see each other's data $self->zflush; # no recursion, schedule another call ASAP, # but only after all pending writes are done. # autovivify wbuf. wbuf may be populated by $cb, # no need to rearm if so: (push returns new size of array) - requeue($self) if push(@{$self->{wbuf}}, $self->can('long_step')) == 1; + requeue($self) if push(@{$self->{wbuf}}, \&long_step) == 1; +} + +sub long_response ($$;@) { + my ($self, $cb, @args) = @_; # cb returns true if more, false if done + my $sock = $self->{sock} or return; + # make sure we disable reading during a long response, + # clients should not be sending us stuff and making us do more + # work while we are stream a response to them + $self->{long_cb} = [ fileno($sock), $cb, now(), @args ]; + long_step($self); # kick off! + undef; } sub dwaitpid ($;$$) { diff --git a/lib/PublicInbox/IMAP.pm b/lib/PublicInbox/IMAP.pm index 18a12564dad042983f52a818da9ce56ae4196cbd..ce0dce0f317e8fb40a2a0b9da160594cafeb4fcf 100644 --- a/lib/PublicInbox/IMAP.pm +++ b/lib/PublicInbox/IMAP.pm @@ -1035,7 +1035,7 @@ $range_csv = 'bad' if $range_csv !~ $valid_range; my $range_info = range_step($self, \$range_csv); return "$tag $range_info\r\n" if !ref($range_info); uo2m_hibernate($self) if $cb == \&fetch_blob; # slow, save RAM - long_response($self, $cb, $tag, [], $range_info, $ops, $partial); + $self->long_response($cb, $tag, [], $range_info, $ops, $partial); } sub cmd_fetch ($$$$;@) { @@ -1050,7 +1050,7 @@ msn_to_uid_range(msn2uid($self), $range_csv); my $range_info = range_step($self, \$range_csv); return "$tag $range_info\r\n" if !ref($range_info); uo2m_hibernate($self) if $cb == \&fetch_blob; # slow, save RAM - long_response($self, $cb, $tag, [], $range_info, $ops, $partial); + $self->long_response($cb, $tag, [], $range_info, $ops, $partial); } sub msn_convert ($$) { @@ -1094,7 +1094,7 @@ return "$tag $q\r\n" if !ref($q); my ($sql, $range_info) = delete @$q{qw(sql range_info)}; if (!scalar(keys %$q)) { # overview.sqlite3 $self->msg_more('* SEARCH'); - long_response($self, \&search_uid_range, + $self->long_response(\&search_uid_range, $tag, $sql, $range_info, $want_msn); } elsif ($q = $q->{xap}) { my $srch = $self->{ibx}->isrch or @@ -1165,35 +1165,6 @@ return 0 unless defined $res; $self->write($res); } -sub long_step { - my ($self) = @_; - # wbuf is unset or empty, here; {long} may add to it - my ($fd, $cb, $t0, @args) = @{$self->{long_cb}}; - my $more = eval { $cb->($self, @args) }; - if ($@ || !$self->{sock}) { # something bad happened... - delete $self->{long_cb}; - my $elapsed = now() - $t0; - if ($@) { - err($self, - "%s during long response[$fd] - %0.6f", - $@, $elapsed); - } - out($self, " deferred[$fd] aborted - %0.6f", $elapsed); - $self->close; - } elsif ($more) { # $self->{wbuf}: - # control passed to ibx_async_cat if $more == \undef - $self->requeue_once($self) if !ref($more); - } else { # all done! - delete $self->{long_cb}; - my $elapsed = now() - $t0; - my $fd = fileno($self->{sock}); - out($self, " deferred[$fd] done - %0.6f", $elapsed); - my $wbuf = $self->{wbuf}; # do NOT autovivify - - $self->requeue unless $wbuf && @$wbuf; - } -} - sub err ($$;@) { my ($self, $fmt, @args) = @_; printf { $self->{imapd}->{err} } $fmt."\n", @args; @@ -1202,18 +1173,6 @@ sub out ($$;@) { my ($self, $fmt, @args) = @_; printf { $self->{imapd}->{out} } $fmt."\n", @args; -} - -sub long_response ($$;@) { - my ($self, $cb, @args) = @_; # cb returns true if more, false if done - - my $sock = $self->{sock} or return; - # make sure we disable reading during a long response, - # clients should not be sending us stuff and making us do more - # work while we are stream a response to them - $self->{long_cb} = [ fileno($sock), $cb, now(), @args ]; - long_step($self); # kick off! - undef; } # callback used by PublicInbox::DS for any (e)poll (in/out/hup/err) diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 5eb6112ceb2f932286bd263a11795b4c163590b1..2a59cbd705c77ec96a160f07d4c621d5e03de299 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -122,7 +122,7 @@ sub list_active ($;$) { # called by cmd_list my ($self, $wildmat) = @_; wildmat2re($wildmat); - long_response($self, \&list_active_i, [ + $self->long_response(\&list_active_i, [ grep(/$wildmat/, @{$self->{nntpd}->{groupnames}}) ]); } @@ -141,7 +141,7 @@ sub list_active_times ($;$) { # called by cmd_list my ($self, $wildmat) = @_; wildmat2re($wildmat); - long_response($self, \&list_active_times_i, [ + $self->long_response(\&list_active_times_i, [ grep(/$wildmat/, @{$self->{nntpd}->{groupnames}}) ]); } @@ -160,7 +160,7 @@ sub list_newsgroups ($;$) { # called by cmd_list my ($self, $wildmat) = @_; wildmat2re($wildmat); - long_response($self, \&list_newsgroups_i, [ + $self->long_response(\&list_newsgroups_i, [ grep(/$wildmat/, @{$self->{nntpd}->{groupnames}}) ]); } @@ -178,7 +178,7 @@ $self->msg_more("215 information follows\r\n"); $arg->($self, @args); } else { $self->msg_more("215 list of newsgroups follows\r\n"); - long_response($self, \&list_active_i, [ # copy array + $self->long_response(\&list_active_i, [ # copy array @{$self->{nntpd}->{groupnames}} ]); } } @@ -210,9 +210,9 @@ $self->{ibx} or return \"412 no newsgroup selected\r\n"; if (defined $range) { my $r = get_range($self, $range); return $r unless ref $r; - long_response($self, \&listgroup_range_i, @$r); + $self->long_response(\&listgroup_range_i, @$r); } else { # grab every article number - long_response($self, \&listgroup_all_i, \(my $num = 0)); + $self->long_response(\&listgroup_all_i, \(my $num = 0)); } } @@ -268,7 +268,7 @@ return r501 if $@; # TODO dists $self->msg_more("231 list of new newsgroups follows\r\n"); - long_response($self, \&newgroups_i, $ts, \(my $i = 0), + $self->long_response(\&newgroups_i, $ts, \(my $i = 0), $self->{nntpd}->{groupnames}); } @@ -339,7 +339,7 @@ my @names = grep(!/$skip/, grep(/$keep/, @{$self->{nntpd}->{groupnames}})); return ".\r\n" unless scalar(@names); my $prev = 0; - long_response($self, \&newnews_i, \@names, $ts, \$prev); + $self->long_response(\&newnews_i, \@names, $ts, \$prev); } sub cmd_group ($$) { @@ -613,46 +613,7 @@ $end = $max if ($end > $max); $beg > $end ? "420 No article(s) selected\r\n" : [ \$beg, $end ]; } -sub long_step { - my ($self) = @_; - # wbuf is unset or empty, here; {long} may add to it - my ($fd, $cb, $t0, @args) = @{$self->{long_cb}}; - my $more = eval { $cb->($self, @args) }; - if ($@ || !$self->{sock}) { # something bad happened... - delete $self->{long_cb}; - my $elapsed = now() - $t0; - if ($@) { - err($self, - "%s during long response[$fd] - %0.6f", - $@, $elapsed); - } - out($self, " deferred[$fd] aborted - %0.6f", $elapsed); - $self->close; - } elsif ($more) { # $self->{wbuf}: - # control passed to ibx_async_cat if $more == \undef - $self->requeue_once if !ref($more); - } else { # all done! - delete $self->{long_cb}; - $self->write(\".\r\n"); # TODO get rid of this - my $elapsed = now() - $t0; - my $fd = fileno($self->{sock}); - out($self, " deferred[$fd] done - %0.6f", $elapsed); - my $wbuf = $self->{wbuf}; # do NOT autovivify - $self->requeue unless $wbuf && @$wbuf; - } -} - -sub long_response ($$;@) { - my ($self, $cb, @args) = @_; # cb returns true if more, false if done - - my $sock = $self->{sock} or return; - # make sure we disable reading during a long response, - # clients should not be sending us stuff and making us do more - # work while we are stream a response to them - $self->{long_cb} = [ fileno($sock), $cb, now(), @args ]; - long_step($self); # kick off! - undef; -} +sub long_response_done { $_[0]->write(\".\r\n") } # overrides superclass sub hdr_msgid_range_i { my ($self, $beg, $end) = @_; @@ -674,7 +635,7 @@ $range = $self->{article} unless defined $range; my $r = get_range($self, $range); return $r unless ref $r; $self->msg_more($xhdr ? r221 : r225); - long_response($self, \&hdr_msgid_range_i, @$r); + $self->long_response(\&hdr_msgid_range_i, @$r); } } @@ -746,7 +707,7 @@ $range = $self->{article} unless defined $range; my $r = get_range($self, $range); return $r unless ref $r; $self->msg_more($xhdr ? r221 : r225); - long_response($self, \&xref_range_i, @$r); + $self->long_response(\&xref_range_i, @$r); } } @@ -790,7 +751,7 @@ $range = $self->{article} unless defined $range; my $r = get_range($self, $range); return $r unless ref $r; $self->msg_more($xhdr ? r221 : r225); - long_response($self, \&smsg_range_i, @$r, $field); + $self->long_response(\&smsg_range_i, @$r, $field); } } @@ -860,7 +821,7 @@ $range = $self->{article} unless defined $range; my $r = get_range($self, $range); return $r unless ref $r; $self->msg_more("224 Overview information follows\r\n"); - long_response($self, \&xrover_i, @$r); + $self->long_response(\&xrover_i, @$r); } sub over_line ($$$) { @@ -924,7 +885,7 @@ return $r unless ref $r; my ($beg, $end) = @$r; $self->msg_more( "224 Overview information follows for $$beg to $end\r\n"); - long_response($self, \&xover_i, @$r); + $self->long_response(\&xover_i, @$r); } sub compressed { undef } diff --git a/lib/PublicInbox/POP3.pm b/lib/PublicInbox/POP3.pm index 741b5e58a90949854fdcf4331d2e3ad8e89bcfdf..60eedea761dace658a8b77f1ca6bb167c321b9e7 100644 --- a/lib/PublicInbox/POP3.pm +++ b/lib/PublicInbox/POP3.pm @@ -55,45 +55,6 @@ my ($self, $fmt, @args) = @_; printf { $self->{pop3d}->{out} } $fmt."\n", @args; } -sub long_step { - my ($self) = @_; - # wbuf is unset or empty, here; {long} may add to it - my ($fd, $cb, $t0, @args) = @{$self->{long_cb}}; - my $more = eval { $cb->($self, @args) }; - if ($@ || !$self->{sock}) { # something bad happened... - delete $self->{long_cb}; - my $elapsed = now() - $t0; - if ($@) { - err($self, - "%s during long response[$fd] - %0.6f", - $@, $elapsed); - } - out($self, " deferred[$fd] aborted - %0.6f", $elapsed); - $self->close; - } elsif ($more) { # $self->{wbuf}: - # control passed to ibx_async_cat if $more == \undef - requeue_once($self) if !ref($more); - } else { # all done! - delete $self->{long_cb}; - my $elapsed = now() - $t0; - my $fd = fileno($self->{sock}); - out($self, " deferred[$fd] done - %0.6f", $elapsed); - my $wbuf = $self->{wbuf}; # do NOT autovivify - $self->requeue unless $wbuf && @$wbuf; - } -} - -sub long_response ($$;@) { - my ($self, $cb, @args) = @_; # cb returns true if more, false if done - my $sock = $self->{sock} or return; - # make sure we disable reading during a long response, - # clients should not be sending us stuff and making us do more - # work while we are stream a response to them - $self->{long_cb} = [ fileno($sock), $cb, now(), @args ]; - long_step($self); # kick off! - undef; -} - sub do_greet { my ($self) = @_; my $s = $self->{salt} = sprintf('%x.%x', int(rand(0x7fffffff)), time);