use PublicInbox::EmlContentFoo qw(parse_content_disposition);
use PublicInbox::DS qw(now);
use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+use PublicInbox::GitAsyncCat;
use Text::ParseWords qw(parse_line);
use Errno qw(EAGAIN);
+
my $Address;
for my $mod (qw(Email::Address::XS Mail::Address)) {
eval "require $mod" or next;
\$ret;
}
-sub uid_fetch_cb { # called by git->cat_async
+sub requeue_once ($) {
+ my ($self) = @_;
+ # COMPRESS users all share the same DEFLATE context.
+ # Flush it here to ensure clients don't see
+ # each other's data
+ $self->zflush;
+
+ # no recursion, schedule another call ASAP,
+ # but only after all pending writes are done.
+ # autovivify wbuf:
+ my $new_size = push(@{$self->{wbuf}}, \&long_step);
+
+ # wbuf may be populated by $cb, no need to rearm if so:
+ $self->requeue if $new_size == 1;
+}
+
+sub uid_fetch_cb { # called by git->cat_async via git_async_msg
my ($bref, $oid, $type, $size, $fetch_m_arg) = @_;
my ($self, undef, $ibx, undef, undef, $msgs, $want) = @$fetch_m_arg;
my $smsg = shift @$msgs or die 'BUG: no smsg';
if (!defined($oid)) {
# it's possible to have TOCTOU if an admin runs
# public-inbox-(edit|purge), just move onto the next message
- return unless defined $want->{-seqno};
+ return requeue_once($self) unless defined $want->{-seqno};
$bref = dummy_message($smsg->{num}, $ibx);
} else {
$smsg->{blob} eq $oid or die "BUG: $smsg->{blob} != $oid";
partial_emit($self, $partial, $eml);
}
$self->msg_more(")\r\n");
+ requeue_once($self);
}
sub uid_fetch_m { # long_response
}
$$beg = $msgs->[-1]->{num} + 1;
}
- my $git = $ibx->git;
- $git->cat_async_begin; # TODO: actually make async
- $git->cat_async($msgs->[0]->{blob}, \&uid_fetch_cb, \@_);
- $git->cat_async_wait;
- 1;
+ git_async_msg($ibx, $msgs->[0], \&uid_fetch_cb, \@_);
}
sub cmd_status ($$$;@) {
my $seq = $want->{-seqno}++;
my $cur_num = $msgs->[0]->{num};
if ($cur_num == $seq) { # as expected
- my $git = $ibx->git;
- $git->cat_async_begin; # TODO: actually make async
- $git->cat_async($msgs->[0]->{blob}, \&uid_fetch_cb, \@_);
- $git->cat_async_wait;
+ git_async_msg($ibx, $msgs->[0], \&uid_fetch_cb, \@_);
} elsif ($cur_num > $seq) {
# send dummy messages until $seq catches up to $cur_num
my $smsg = bless { num => $seq, ts => 0 }, 'PublicInbox::Smsg';
unshift @$msgs, $smsg;
my $bref = dummy_message($seq, $ibx);
uid_fetch_cb($bref, undef, undef, undef, \@_);
+ $smsg; # blessed response since uid_fetch_cb requeues
} else { # should not happen
die "BUG: cur_num=$cur_num < seq=$seq";
}
- 1; # more messages on the way
}
sub cmd_fetch ($$$;@) {
} elsif ($more) { # $self->{wbuf}:
$self->update_idle_time;
- # COMPRESS users all share the same DEFLATE context.
- # Flush it here to ensure clients don't see
- # each other's data
- $self->zflush;
-
- # no recursion, schedule another call ASAP, but only after
- # all pending writes are done. autovivify wbuf:
- my $new_size = push(@{$self->{wbuf}}, \&long_step);
-
- # wbuf may be populated by $cb, no need to rearm if so:
- $self->requeue if $new_size == 1;
+ # control passed to $more may be a GitAsyncCat object
+ requeue_once($self) if !ref($more);
} else { # all done!
delete $self->{long_cb};
my $elapsed = now() - $t0;