lib/PublicInbox/WwwAttach.pm
lib/PublicInbox/WwwHighlight.pm
lib/PublicInbox/WwwListing.pm
+lib/PublicInbox/WwwStatic.pm
lib/PublicInbox/WwwStream.pm
lib/PublicInbox/WwwText.pm
lib/PublicInbox/Xapcmd.pm
*input_prepare = *PublicInbox::GitHTTPBackend::input_prepare;
*parse_cgi_headers = *PublicInbox::GitHTTPBackend::parse_cgi_headers;
*serve = *PublicInbox::GitHTTPBackend::serve;
-*static_result = *PublicInbox::GitHTTPBackend::static_result;
use warnings;
use PublicInbox::Qspawn;
+use PublicInbox::WwwStatic;
use Plack::MIME;
sub locate_cgit ($) {
pi_config => $pi_config,
}, $class;
- $pi_config->each_inbox(sub {}); # fill in -code_repos mapped to inboxes
+ $pi_config->fill_all; # fill in -code_repos mapped to inboxes
# some cgit repos may not be mapped to inboxes, so ensure those exist:
my $code_repos = $pi_config->{-code_repos};
);
# XXX: cgit filters may care about more variables...
+sub cgit_parse_hdr { # {parse_hdr} for Qspawn
+ my ($r, $bref) = @_;
+ my $res = parse_cgi_headers($r, $bref) or return; # incomplete
+ $res;
+}
+
sub call {
my ($self, $env) = @_;
my $path_info = $env->{PATH_INFO};
} elsif ($path_info =~ m!$self->{static}! &&
defined($cgit_data = $self->{cgit_data})) {
my $f = $1;
- my $type = Plack::MIME->mime_type($f);
- return static_result($env, [], $cgit_data.$f, $type);
+ return PublicInbox::WwwStatic::response($env, [], $cgit_data.$f,
+ Plack::MIME->mime_type($f));
}
my $cgi_env = { PATH_INFO => $path_info };
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new($self->{cmd}, $cgi_env, $rdr);
my $limiter = $self->{pi_config}->limiter('-cgit');
- $qsp->psgi_return($env, $limiter, sub {
- my ($r, $bref) = @_;
- my $res = parse_cgi_headers($r, $bref) or return; # incomplete
- $res;
- });
+ $qsp->psgi_return($env, $limiter, \&cgit_parse_hdr);
}
1;
$self;
}
-sub _fill_all ($) { each_inbox($_[0], sub {}) }
+sub noop {}
+sub fill_all ($) { each_inbox($_[0], \&noop) }
sub _lookup_fill ($$$) {
my ($self, $cache, $key) = @_;
$self->{$cache}->{$key} // do {
- _fill_all($self);
+ fill_all($self);
$self->{$cache}->{$key};
}
}
}
sub each_inbox {
- my ($self, $cb) = @_;
+ my ($self, $cb, $arg) = @_;
# may auto-vivify if config file is non-existent:
foreach my $section (@{$self->{-section_order}}) {
next if $section !~ m!\Apublicinbox\.([^/]+)\z!;
my $ibx = lookup_name($self, $1) or next;
- $cb->($ibx);
+ $cb->($ibx, $arg);
}
}
if ($ibx->{obfuscate}) {
$ibx->{-no_obfuscate} = $self->{-no_obfuscate};
$ibx->{-no_obfuscate_re} = $self->{-no_obfuscate_re};
- _fill_all($self); # noop to populate -no_obfuscate
+ fill_all($self); # noop to populate -no_obfuscate
}
if (my $ibx_code_repos = $ibx->{coderepo}) {
$dig->add("$h\0$v\0");
}
+sub content_dig_i {
+ my ($dig) = $_[1];
+ my ($part, $depth, @idx) = @{$_[0]};
+ $dig->add("\0$depth:".join('.', @idx)."\0");
+ my $fn = $part->filename;
+ if (defined $fn) {
+ utf8::encode($fn);
+ $dig->add("fn\0$fn\0");
+ }
+ my @d = $part->header('Content-Description');
+ foreach my $d (@d) {
+ utf8::encode($d);
+ $dig->add("d\0$d\0");
+ }
+ $dig->add("b\0");
+ my $ct = $part->content_type || 'text/plain';
+ my ($s, undef) = msg_part_text($part, $ct);
+ if (defined $s) {
+ $s =~ s/\r\n/\n/gs;
+ $s =~ s/\s*\z//s;
+ utf8::encode($s);
+ } else {
+ $s = $part->body;
+ }
+ $dig->add($s);
+}
+
sub content_digest ($) {
my ($mime) = @_;
my $dig = Digest::SHA->new(256);
my @v = $hdr->header($h);
digest_addr($dig, $h, $_) foreach @v;
}
- msg_iter($mime, sub {
- my ($part, $depth, @idx) = @{$_[0]};
- $dig->add("\0$depth:".join('.', @idx)."\0");
- my $fn = $part->filename;
- if (defined $fn) {
- utf8::encode($fn);
- $dig->add("fn\0$fn\0");
- }
- my @d = $part->header('Content-Description');
- foreach my $d (@d) {
- utf8::encode($d);
- $dig->add("d\0$d\0");
- }
- $dig->add("b\0");
- my $ct = $part->content_type || 'text/plain';
- my ($s, undef) = msg_part_text($part, $ct);
- if (defined $s) {
- $s =~ s/\r\n/\n/gs;
- $s =~ s/\s*\z//s;
- utf8::encode($s);
- } else {
- $s = $part->body;
- }
- $dig->add($s);
- });
+ msg_iter($mime, \&content_dig_i, $dig);
$dig;
}
sub PARTIAL_MAX () { 100 }
+sub mids_from_mset { # Search::retry_reopen callback
+ [ map { PublicInbox::SearchMsg::from_mitem($_)->mid } $_[0]->items ];
+}
+
sub search_partial ($$) {
my ($srch, $mid) = @_;
return if length($mid) < $MIN_PARTIAL_LEN;
# Search::Xapian::QueryParserError or even:
# "something terrible happened at ../Search/Xapian/Enquire.pm"
my $mset = eval { $srch->query($m, $opt) } or next;
+ my $mids = $srch->retry_reopen(\&mids_from_mset, $mset);
+ return $mids if scalar(@$mids);
+ }
+}
+
+sub ext_msg_i {
+ my ($other, $arg) = @_;
+ my ($cur, $mid, $ibxs, $found) = @$arg;
- my @mids = map {
- my $doc = $_->get_document;
- PublicInbox::SearchMsg->load_doc($doc)->mid;
- } $mset->items;
- return \@mids if scalar(@mids);
+ return if $other->{name} eq $cur->{name} || !$other->base_url;
+
+ my $mm = $other->mm or return;
+
+ # try to find the URL with Msgmap to avoid forking
+ my $num = $mm->num_for($mid);
+ if (defined $num) {
+ push @$found, $other;
+ } else {
+ # no point in trying the fork fallback if we
+ # know Xapian is up-to-date but missing the
+ # message in the current repo
+ push @$ibxs, $other;
}
}
my $mid = $ctx->{mid};
eval { require PublicInbox::Msgmap };
- my (@ibx, @found);
-
- $ctx->{www}->{pi_config}->each_inbox(sub {
- my ($other) = @_;
- return if $other->{name} eq $cur->{name} || !$other->base_url;
-
- my $mm = $other->mm or return;
-
- # try to find the URL with Msgmap to avoid forking
- my $num = $mm->num_for($mid);
- if (defined $num) {
- push @found, $other;
- } else {
- # no point in trying the fork fallback if we
- # know Xapian is up-to-date but missing the
- # message in the current repo
- push @ibx, $other;
- }
- });
+ my $ibxs = [];
+ my $found = [];
+ my $arg = [ $cur, $mid, $ibxs, $found ];
+
+ $ctx->{www}->{pi_config}->each_inbox(\&ext_msg_i, $arg);
- return exact($ctx, \@found, $mid) if @found;
+ return exact($ctx, $found, $mid) if @$found;
# fall back to partial MID matching
my @partial;
# can't find a partial match in current inbox, try the others:
if (!$n_partial && length($mid) >= $MIN_PARTIAL_LEN) {
- foreach my $ibx (@ibx) {
+ foreach my $ibx (@$ibxs) {
$srch = $ibx->search or next;
$mids = search_partial($srch, $mid) or next;
$n_partial += scalar(@$mids);
use PublicInbox::WwwAtomStream;
use PublicInbox::SearchMsg; # this loads w/o Search::Xapian
+sub generate_i {
+ my ($ctx) = @_;
+ while (my $smsg = shift @{$ctx->{msgs}}) {
+ $ctx->{-inbox}->smsg_mime($smsg) and return $smsg;
+ }
+}
+
# main function
sub generate {
my ($ctx) = @_;
- my $msgs = recent_msgs($ctx);
+ my $msgs = $ctx->{msgs} = recent_msgs($ctx);
return _no_thread() unless @$msgs;
-
- my $ibx = $ctx->{-inbox};
- PublicInbox::WwwAtomStream->response($ctx, 200, sub {
- while (my $smsg = shift @$msgs) {
- $ibx->smsg_mime($smsg) and return $smsg;
- }
- });
+ PublicInbox::WwwAtomStream->response($ctx, 200, \&generate_i);
}
sub generate_thread_atom {
my ($ctx) = @_;
- my $mid = $ctx->{mid};
- my $ibx = $ctx->{-inbox};
- my $msgs = $ibx->over->get_thread($mid);
+ my $msgs = $ctx->{msgs} = $ctx->{-inbox}->over->get_thread($ctx->{mid});
return _no_thread() unless @$msgs;
-
- PublicInbox::WwwAtomStream->response($ctx, 200, sub {
- while (my $smsg = shift @$msgs) {
- $ibx->smsg_mime($smsg) and return $smsg;
- }
- });
+ PublicInbox::WwwAtomStream->response($ctx, 200, \&generate_i);
}
sub generate_html_index {
[ "Redirecting to $url\n" ] ];
}
+sub new_html_i {
+ my ($nr, $ctx) = @_;
+ my $msgs = $ctx->{msgs};
+ while (my $smsg = shift @$msgs) {
+ my $m = $ctx->{-inbox}->smsg_mime($smsg) or next;
+ my $more = scalar @$msgs;
+ return PublicInbox::View::index_entry($m, $ctx, $more);
+ }
+ PublicInbox::View::pagination_footer($ctx, './new.html');
+}
+
sub new_html {
my ($ctx) = @_;
- my $msgs = recent_msgs($ctx);
+ my $msgs = $ctx->{msgs} = recent_msgs($ctx);
if (!@$msgs) {
return [404, ['Content-Type', 'text/plain'],
["No messages, yet\n"] ];
$ctx->{-html_tip} = '<pre>';
$ctx->{-upfx} = '';
$ctx->{-hr} = 1;
- my $ibx = $ctx->{-inbox};
- PublicInbox::WwwStream->response($ctx, 200, sub {
- while (my $smsg = shift @$msgs) {
- my $m = $ibx->smsg_mime($smsg) or next;
- my $more = scalar @$msgs;
- return PublicInbox::View::index_entry($m, $ctx, $more);
- }
- PublicInbox::View::pagination_footer($ctx, './new.html');
- });
+ PublicInbox::WwwStream->response($ctx, 200, \&new_html_i);
}
# private subs
use warnings;
sub new {
- my ($class, $rpipe, $end, $buf, $filter) = @_;
+ my ($class, $rpipe, $end, $end_arg, $buf) = @_;
bless {
rpipe => $rpipe,
end => $end,
+ end_arg => $end_arg,
buf => $buf,
- filter => $filter || 0,
+ filter => 0,
}, $class;
}
my $buf = delete $self->{buf}; # initial buffer
$buf = $self->{rpipe}->getline unless defined $buf;
$self->{filter} = -1 unless defined $buf; # set EOF for next call
- $filter ? $filter->($buf) : $buf;
+ $buf;
}
sub close {
my ($self) = @_;
- my $rpipe = delete $self->{rpipe};
+ my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)};
close $rpipe if $rpipe;
- my $end = delete $self->{end};
- $end->() if $end;
+ $end->($end_arg) if $end;
}
1;
sub _cat_async_step ($$$) {
my ($self, $inflight, $in) = @_;
- my $cb = shift @$inflight or die 'BUG: inflight empty';
+ my $pair = shift @$inflight or die 'BUG: inflight empty';
+ my ($cb, $arg) = @$pair;
local $/ = "\n";
my $head = $in->getline;
- return eval { $cb->(undef) } if $head =~ / missing$/;
+ $head =~ / missing$/ and return
+ eval { $cb->(undef, undef, undef, undef, $arg) };
$head =~ /^([0-9a-f]{40}) (\S+) ([0-9]+)$/ or
fail($self, "Unexpected result from async git cat-file: $head");
my ($oid_hex, $type, $size) = ($1, $2, $3 + 0);
my $bref = read_cat_in_full($self, $in, $size);
- eval { $cb->($bref, $oid_hex, $type, $size) };
+ eval { $cb->($bref, $oid_hex, $type, $size, $arg) };
}
sub cat_async_wait ($) {
$self->{inflight} = [];
}
-sub cat_async ($$$) {
- my ($self, $oid, $cb) = @_;
+sub cat_async ($$$;$) {
+ my ($self, $oid, $cb, $arg) = @_;
my $inflight = $self->{inflight} or die 'BUG: not in async';
if (scalar(@$inflight) >= MAX_INFLIGHT) {
_cat_async_step($self, $inflight, $self->{in});
}
$self->{out}->print($oid, "\n") or fail($self, "write error: $!");
- push @$inflight, $cb;
+ push(@$inflight, [ $cb, $arg ]);
}
sub commit_title ($$) {
use IO::Handle;
use HTTP::Date qw(time2str);
use HTTP::Status qw(status_message);
-use Plack::Util;
use PublicInbox::Qspawn;
use PublicInbox::Tmpfile;
+use PublicInbox::WwwStatic;
# 32 is same as the git-daemon connection limit
my $default_limiter = PublicInbox::Qspawn::Limiter->new(32);
$env->{'psgi.errors'}->print(@msg, "\n");
}
-sub drop_client ($) {
- if (my $io = $_[0]->{'psgix.io'}) {
- $io->close; # this is PublicInbox::DS::close
- }
-}
-
my $prev = 0;
my $exp;
sub cache_one_year {
'Cache-Control', 'public, max-age=31536000';
}
-sub static_result ($$$$) {
- my ($env, $h, $f, $type) = @_;
- return r(404) unless -f $f && -r _; # just in case it's a FIFO :P
-
- # TODO: If-Modified-Since and Last-Modified?
- open my $in, '<', $f or return r(404);
- my $size = -s $in;
- my $len = $size;
- my $code = 200;
- push @$h, 'Content-Type', $type;
- if (($env->{HTTP_RANGE} || '') =~ /\bbytes=([0-9]*)-([0-9]*)\z/) {
- ($code, $len) = prepare_range($env, $in, $h, $1, $2, $size);
- if ($code == 416) {
- push @$h, 'Content-Range', "bytes */$size";
- return [ 416, $h, [] ];
- }
- }
- push @$h, 'Content-Length', $len;
- my $n = 65536;
- [ $code, $h, Plack::Util::inline_object(close => sub { close $in },
- getline => sub {
- return if $len == 0;
- $n = $len if $len < $n;
- my $r = sysread($in, my $buf, $n);
- if (!defined $r) {
- err($env, "$f read error: $!");
- } elsif ($r <= 0) {
- err($env, "$f EOF with $len bytes left");
- } else {
- $len -= $r;
- $n = 8192;
- return $buf;
- }
- drop_client($env);
- return;
- })]
-}
-
sub serve_dumb {
my ($env, $git, $path) = @_;
} else {
return r(404);
}
-
- static_result($env, $h, "$git->{git_dir}/$path", $type);
+ $path = "$git->{git_dir}/$path";
+ PublicInbox::WwwStatic::response($env, $h, $path, $type) // r(404);
}
-sub prepare_range {
- my ($env, $in, $h, $beg, $end, $size) = @_;
- my $code = 200;
- my $len = $size;
- if ($beg eq '') {
- if ($end ne '') { # "bytes=-$end" => last N bytes
- $beg = $size - $end;
- $beg = 0 if $beg < 0;
- $end = $size - 1;
- $code = 206;
- } else {
- $code = 416;
- }
- } else {
- if ($beg > $size) {
- $code = 416;
- } elsif ($end eq '' || $end >= $size) {
- $end = $size - 1;
- $code = 206;
- } elsif ($end < $size) {
- $code = 206;
- } else {
- $code = 416;
- }
- }
- if ($code == 206) {
- $len = $end - $beg + 1;
- if ($len <= 0) {
- $code = 416;
- } else {
- sysseek($in, $beg, SEEK_SET) or return [ 500, [], [] ];
- push @$h, qw(Accept-Ranges bytes Content-Range);
- push @$h, "bytes $beg-$end/$size";
-
- # FIXME: Plack::Middleware::Deflater bug?
- $env->{'psgix.no-compress'} = 1;
- }
- }
- ($code, $len);
+sub git_parse_hdr { # {parse_hdr} for Qspawn
+ my ($r, $bref, $dumb_args) = @_;
+ my $res = parse_cgi_headers($r, $bref) or return; # incomplete
+ $res->[0] == 403 ? serve_dumb(@$dumb_args) : $res;
}
# returns undef if 403 so it falls back to dumb HTTP
$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
- $qsp->psgi_return($env, $limiter, sub { # parse_hdr
- my ($r, $bref) = @_;
- my $res = parse_cgi_headers($r, $bref) or return; # incomplete
- $res->[0] == 403 ? serve_dumb($env, $git, $path) : $res;
- });
+ $qsp->psgi_return($env, $limiter, \&git_parse_hdr, [$env, $git, $path]);
}
sub input_prepare {
use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(cb end);
+use fields qw(http fh cb arg end_obj);
use Errno qw(EAGAIN);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
# $io is a read-only pipe ($rpipe) for now, but may be a
# bidirectional socket in the future.
sub new {
- my ($class, $io, $cb, $end) = @_;
+ my ($class, $io, $cb, $arg, $end_obj) = @_;
# no $io? call $cb at the top of the next event loop to
# avoid recursion:
unless (defined($io)) {
- PublicInbox::DS::requeue($cb);
- die '$end unsupported w/o $io' if $end;
+ PublicInbox::DS::requeue($cb ? $cb : $arg);
+ die '$end_obj unsupported w/o $io' if $end_obj;
return;
}
my $self = fields::new($class);
IO::Handle::blocking($io, 0);
$self->SUPER::new($io, EPOLLIN | EPOLLET);
- $self->{cb} = $cb; # initial read callback, later replaced by main_cb
- $self->{end} = $end; # like END {}, but only for this object
+ $self->{cb} = $cb; # initial read callback
+ $self->{arg} = $arg; # arg for $cb
+ $self->{end_obj} = $end_obj; # like END{}, can ->event_step
$self;
}
-sub main_cb ($$) {
- my ($http, $fh) = @_;
- sub {
- my ($self) = @_;
+sub event_step {
+ my ($self) = @_;
+ if (my $cb = delete $self->{cb}) {
+ # this may call async_pass when headers are done
+ $cb->(delete $self->{arg});
+ } elsif (my $sock = $self->{sock}) {
+ my $http = $self->{http};
# $self->{sock} is a read pipe for git-http-backend or cgit
# and 65536 is the default Linux pipe size
- my $r = sysread($self->{sock}, my $buf, 65536);
+ my $r = sysread($sock, my $buf, 65536);
if ($r) {
- $fh->write($buf); # may call $http->close
+ $self->{fh}->write($buf); # may call $http->close
if ($http->{sock}) { # !closed
$self->requeue;
# let other clients get some work done, too
return; # EPOLLET means we'll be notified
}
- # Done! Error handling will happen in $fh->close
- # called by the {end} handler
+ # Done! Error handling will happen in $self->{fh}->close
+ # called by end_obj->event_step handler
delete $http->{forward};
- $self->close; # queues ->{end} to be called
- }
+ $self->close; # queues end_obj->event_step to be called
+ } # else { # we may've been requeued but closed by $http
}
# once this is called, all data we read is passed to the
# calls after this may use much memory:
$$bref = undef;
- # replace the header read callback with the main one
- my $cb = $self->{cb} = main_cb($http, $fh);
- $cb->($self); # either hit EAGAIN or ->requeue to keep EPOLLET happy
-}
+ $self->{http} = $http;
+ $self->{fh} = $fh;
-sub event_step {
- # {cb} may be undef after ->requeue due to $http->close happening
- my $cb = $_[0]->{cb} or return;
- $cb->(@_);
+ # either hit EAGAIN or ->requeue to keep EPOLLET happy
+ event_step($self);
}
-# may be called as $forward->close in PublicInbox::HTTP or EOF (main_cb)
+# may be called as $forward->close in PublicInbox::HTTP or EOF (event_step)
sub close {
my $self = $_[0];
- delete $self->{cb};
$self->SUPER::close; # DS::close
# we defer this to the next timer loop since close is deferred
- if (my $end = delete $self->{end}) {
- PublicInbox::DS::requeue($end);
+ if (my $end_obj = delete $self->{end_obj}) {
+ # this calls $end_obj->event_step
+ # (likely PublicInbox::Qspawn::event_step,
+ # NOT PublicInbox::HTTPD::Async::event_step)
+ PublicInbox::DS::requeue($end_obj);
}
}
use warnings;
use PublicInbox::MID qw/mid_escape/;
use PublicInbox::Hval qw/to_filename/;
+use PublicInbox::SearchMsg;
use Email::Simple;
use Email::MIME::Encode;
$_[0] .= "\n";
}
+sub thread_cb {
+ my ($ctx) = @_;
+ my $msgs = $ctx->{msgs};
+ while (1) {
+ if (my $smsg = shift @$msgs) {
+ return $smsg;
+ }
+ # refill result set
+ $ctx->{msgs} = $msgs = $ctx->{over}->get_thread($ctx->{mid},
+ $ctx->{prev});
+ return unless @$msgs;
+ $ctx->{prev} = $msgs->[-1];
+ }
+}
+
sub thread_mbox {
my ($ctx, $over, $sfx) = @_;
eval { require PublicInbox::MboxGz };
- return sub { need_gzip(@_) } if $@;
- my $mid = $ctx->{mid};
- my $msgs = $over->get_thread($mid, {});
+ return need_gzip() if $@;
+ my $msgs = $ctx->{msgs} = $over->get_thread($ctx->{mid}, {});
return [404, [qw(Content-Type text/plain)], []] if !@$msgs;
- my $prev = $msgs->[-1];
- my $i = 0;
- my $cb = sub {
- while (1) {
- if (my $smsg = $msgs->[$i++]) {
- return $smsg;
- }
- # refill result set
- $msgs = $over->get_thread($mid, $prev);
- return unless @$msgs;
- $prev = $msgs->[-1];
- $i = 0;
- }
- };
- PublicInbox::MboxGz->response($ctx, $cb, $msgs->[0]->subject);
+ $ctx->{prev} = $msgs->[-1];
+ $ctx->{over} = $over; # bump refcnt
+ PublicInbox::MboxGz->response($ctx, \&thread_cb, $msgs->[0]->subject);
}
sub emit_range {
mbox_all($ctx, $query);
}
+sub all_ids_cb {
+ my ($ctx) = @_;
+ my $ids = $ctx->{ids};
+ do {
+ while ((my $num = shift @$ids)) {
+ my $smsg = $ctx->{over}->get_art($num) or next;
+ return $smsg;
+ }
+ $ctx->{ids} = $ids = $ctx->{mm}->ids_after(\($ctx->{prev}));
+ } while (@$ids);
+}
+
sub mbox_all_ids {
my ($ctx) = @_;
- my $prev = 0;
my $ibx = $ctx->{-inbox};
- my $ids = $ibx->mm->ids_after(\$prev) or return
+ my $prev = 0;
+ my $mm = $ctx->{mm} = $ibx->mm;
+ my $ids = $mm->ids_after(\$prev) or return
[404, [qw(Content-Type text/plain)], ["No results found\n"]];
- my $i = 0;
- my $over = $ibx->over or
+ $ctx->{over} = $ibx->over or
return PublicInbox::WWW::need($ctx, 'Overview');
- my $cb = sub {
- do {
- while ((my $num = $ids->[$i++])) {
- my $smsg = $over->get_art($num) or next;
- return $smsg;
- }
- $ids = $ibx->mm->ids_after(\$prev);
- $i = 0;
- } while (@$ids);
- undef;
- };
- return PublicInbox::MboxGz->response($ctx, $cb, 'all');
+ $ctx->{ids} = $ids;
+ $ctx->{prev} = $prev;
+ return PublicInbox::MboxGz->response($ctx, \&all_ids_cb, 'all');
+}
+
+sub results_cb {
+ my ($ctx) = @_;
+ my $mset = $ctx->{mset};
+ my $srch = $ctx->{srch};
+ while (1) {
+ while (my $mi = (($mset->items)[$ctx->{iter}++])) {
+ my $smsg = PublicInbox::SearchMsg::from_mitem($mi,
+ $srch) or next;
+ return $smsg;
+ }
+ # refill result set
+ $mset = $ctx->{mset} = $srch->query($ctx->{query},
+ $ctx->{qopts});
+ my $size = $mset->size or return;
+ $ctx->{qopts}->{offset} += $size;
+ $ctx->{iter} = 0;
+ }
}
sub mbox_all {
my ($ctx, $query) = @_;
eval { require PublicInbox::MboxGz };
- return sub { need_gzip(@_) } if $@;
+ return need_gzip() if $@;
return mbox_all_ids($ctx) if $query eq '';
- my $opts = { mset => 2 };
- my $srch = $ctx->{-inbox}->search or
+ my $qopts = $ctx->{qopts} = { mset => 2 };
+ my $srch = $ctx->{srch} = $ctx->{-inbox}->search or
return PublicInbox::WWW::need($ctx, 'Search');;
- my $mset = $srch->query($query, $opts);
- $opts->{offset} = $mset->size or
+ my $mset = $ctx->{mset} = $srch->query($query, $qopts);
+ $qopts->{offset} = $mset->size or
return [404, [qw(Content-Type text/plain)],
["No results found\n"]];
- my $i = 0;
- my $cb = sub { # called by MboxGz->getline
- while (1) {
- while (my $mi = (($mset->items)[$i++])) {
- my $doc = $mi->get_document;
- my $smsg = $srch->retry_reopen(sub {
- PublicInbox::SearchMsg->load_doc($doc);
- }) or next;
- return $smsg;
- }
- # refill result set
- $mset = $srch->query($query, $opts);
- my $size = $mset->size or return;
- $opts->{offset} += $size;
- $i = 0;
- }
- };
- PublicInbox::MboxGz->response($ctx, $cb, 'results-'.$query);
+ $ctx->{iter} = 0;
+ $ctx->{query} = $query;
+ PublicInbox::MboxGz->response($ctx, \&results_cb, 'results-'.$query);
}
sub need_gzip {
- my $fh = $_[0]->([501, ['Content-Type' => 'text/html']]);
my $title = 'gzipped mbox not available';
- $fh->write(<<EOF);
+ my $body = <<EOF;
<html><head><title>$title</title><body><pre>$title
The administrator needs to install the Compress::Raw::Zlib Perl module
to support gzipped mboxes.
<a href="../">Return to index</a></pre></body></html>
EOF
- $fh->close;
+
+ [501,[qw(Content-Type text/html Content-Length), bytes::length($body)],
+ [ $body ] ];
}
1;
my $ctx = $self->{ctx} or return;
my $gz = $self->{gz};
my $buf = delete($self->{buf});
- while (my $smsg = $self->{cb}->()) {
+ while (my $smsg = $self->{cb}->($ctx)) {
my $mref = $ctx->{-inbox}->msg_by_smsg($smsg) or next;
my $h = Email::Simple->new($mref)->header_obj;
# Like Email::MIME::walk_parts, but this is:
# * non-recursive
# * passes depth and indices to the iterator callback
-sub msg_iter ($$) {
- my ($mime, $cb) = @_;
+sub msg_iter ($$;$) {
+ my ($mime, $cb, $cb_arg) = @_;
my @parts = $mime->subparts;
if (@parts) {
my $i = 0;
@sub = map { [ $_, $depth, @idx, ++$i ] } @sub;
@parts = (@sub, @parts);
} else {
- $cb->($p);
+ $cb->($p, $cb_arg);
}
}
} else {
- $cb->([$mime, 0, 0]);
+ $cb->([$mime, 0, 0], $cb_arg);
}
}
[ "Redirecting to $url\n" ] ]
}
-sub try_inbox ($$) {
- my ($ibx, $mid) = @_;
+sub try_inbox {
+ my ($ibx, $arg) = @_;
+ return if scalar(@$arg) > 1;
+
# do not pass $env since HTTP_HOST may differ
my $url = $ibx->base_url or return;
+ my ($mid) = @$arg;
eval { $ibx->mm->num_for($mid) } or return;
# 302 since the same message may show up on
# multiple inboxes and inboxes can be added/reordered
- redirect(302, $url .= mid_escape($mid) . '/');
+ $arg->[1] = redirect(302, $url .= mid_escape($mid) . '/');
}
sub call {
}
foreach my $mid (@try) {
- $pi_config->each_inbox(sub {
- $res ||= try_inbox($_[0], $mid);
- });
- last if defined $res;
+ my $arg = [ $mid ];
+ $pi_config->each_inbox(\&try_inbox, $arg);
+ defined($res = $arg->[1]) and last;
}
$res || [ 404, [qw(Content-Type text/plain)], ["404 Not Found\n"] ];
}
use strict;
use warnings;
use PublicInbox::Spawn qw(popen_rd);
-require Plack::Util;
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
# declares a command to spawn (but does not spawn it).
# $cmd is the command to spawn
-# $env is the environ for the child process
+# $cmd_env is the environ for the child process (not PSGI env)
# $opt can include redirects and perhaps other process spawning options
sub new ($$$;) {
- my ($class, $cmd, $env, $opt) = @_;
- bless { args => [ $cmd, $env, $opt ] }, $class;
+ my ($class, $cmd, $cmd_env, $opt) = @_;
+ bless { args => [ $cmd, $cmd_env, $opt ] }, $class;
}
sub _do_spawn {
my ($self, $start_cb, $limiter) = @_;
my $err;
- my ($cmd, $env, $opts) = @{$self->{args}};
+ my ($cmd, $cmd_env, $opts) = @{$self->{args}};
my %opts = %{$opts || {}};
$self->{limiter} = $limiter;
foreach my $k (PublicInbox::Spawn::RLIMITS()) {
}
}
- ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $env, \%opts);
+ ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $cmd_env, \%opts);
# drop any IO handles opt was holding open via $opt->{hold}
# No need to hold onto the descriptor once the child process has it.
} else {
$self->{err} = $!;
}
- $start_cb->($self->{rpipe});
+ $start_cb->($self);
}
sub child_err ($) {
$err = "W: waitpid($xpid, 0) => $pid: $!";
} # else should not be called with pid == 0
- my $env = delete $self->{env};
+ my ($env, $qx_cb, $qx_arg, $qx_buf) =
+ delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
# done, spawn whatever's in the queue
my $limiter = $self->{limiter};
log_err($env, join(' ', @{$self->{args}}) . ": $err");
}
}
- if (my $fin_cb = delete $self->{fin_cb}) {
- eval { $fin_cb->() }
- }
+ eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
}
-sub do_waitpid ($;$$) {
- my ($self, $env, $fin_cb) = @_;
+sub do_waitpid ($) {
+ my ($self) = @_;
my $pid = $self->{pid};
- $self->{env} = $env;
- $self->{fin_cb} = $fin_cb;
# PublicInbox::DS may not be loaded
eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
# done if we're running in PublicInbox::DS::EventLoop
}
}
-sub finish ($;$$) {
- my ($self, $env, $fin_cb) = @_;
+sub finish ($) {
+ my ($self) = @_;
if (delete $self->{rpipe}) {
- do_waitpid($self, $env, $fin_cb);
- } elsif ($fin_cb) {
- eval { $fin_cb->() };
+ do_waitpid($self);
+ } else {
+ my ($env, $qx_cb, $qx_arg, $qx_buf) =
+ delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+ eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
}
}
-sub start {
+sub start ($$$) {
my ($self, $limiter, $start_cb) = @_;
if ($limiter->{running} < $limiter->{max}) {
_do_spawn($self, $start_cb, $limiter);
}
}
+sub psgi_qx_init_cb {
+ my ($self) = @_;
+ my $async = delete $self->{async};
+ my ($r, $buf);
+ my $qx_fh = $self->{qx_fh};
+reread:
+ $r = sysread($self->{rpipe}, $buf, 65536);
+ if ($async) {
+ $async->async_pass($self->{psgi_env}->{'psgix.io'},
+ $qx_fh, \$buf);
+ } elsif (defined $r) {
+ $r ? $qx_fh->write($buf) : event_step($self, undef);
+ } else {
+ return if $! == EAGAIN; # try again when notified
+ goto reread if $! == EINTR;
+ event_step($self, $!);
+ }
+}
+
+sub psgi_qx_start {
+ my ($self) = @_;
+ if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
+ # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
+ $self->{async} = $async->($self->{rpipe},
+ \&psgi_qx_init_cb, $self, $self);
+ # init_cb will call ->async_pass or ->close
+ } else { # generic PSGI
+ psgi_qx_init_cb($self) while $self->{qx_fh};
+ }
+}
+
# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
# the stdout of the given command when done; but respects the given limiter
# $env is the PSGI env. As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
- my ($self, $env, $limiter, $qx_cb) = @_;
- my $scalar = '';
- open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar
- my $end = sub {
- my $err = $_[0]; # $!
- log_err($env, "psgi_qx: $err") if defined($err);
- finish($self, $env, sub { $qx_cb->(\$scalar) });
- $qx = undef;
- };
- my $rpipe; # comes from popen_rd
- my $async = $env->{'pi-httpd.async'};
- my $cb = sub {
- my ($r, $buf);
-reread:
- $r = sysread($rpipe, $buf, 65536);
- if ($async) {
- $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
- } elsif (defined $r) {
- $r ? $qx->write($buf) : $end->();
+ my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+ $self->{psgi_env} = $env;
+ my $qx_buf = '';
+ open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
+ $self->{qx_cb} = $qx_cb;
+ $self->{qx_arg} = $qx_arg;
+ $self->{qx_fh} = $qx_fh;
+ $self->{qx_buf} = \$qx_buf;
+ $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ start($self, $limiter, \&psgi_qx_start);
+}
+
+# this is called on pipe EOF to reap the process, may be called
+# via PublicInbox::DS event loop OR via GetlineBody for generic
+# PSGI servers.
+sub event_step {
+ my ($self, $err) = @_; # $err: $!
+ log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err);
+ finish($self);
+ my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)});
+ $fh->close if $fh; # async-only (psgi_return)
+}
+
+sub rd_hdr ($) {
+ my ($self) = @_;
+ # typically used for reading CGI headers
+ # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm
+ # We also need to check EINTR for generic PSGI servers.
+ my $ret;
+ my $total_rd = 0;
+ my $hdr_buf = $self->{hdr_buf};
+ my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}};
+ do {
+ my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
+ length($$hdr_buf));
+ if (defined($r)) {
+ $total_rd += $r;
+ $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg);
} else {
- return if $! == EAGAIN; # try again when notified
- goto reread if $! == EINTR;
- $end->($!);
+ # caller should notify us when it's ready:
+ return if $! == EAGAIN;
+ next if $! == EINTR; # immediate retry
+ log_err($self->{psgi_env}, "error reading header: $!");
+ $ret = [ 500, [], [ "Internal error\n" ] ];
}
- };
- $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
- $self->start($limiter, sub { # start_cb, may run later, much later...
- ($rpipe) = @_; # popen_rd result
+ } until (defined $ret);
+ delete $self->{parse_hdr}; # done parsing headers
+ $ret;
+}
+
+sub psgi_return_init_cb {
+ my ($self) = @_;
+ my $r = rd_hdr($self) or return;
+ my $env = $self->{psgi_env};
+ my $wcb = delete $env->{'qspawn.wcb'};
+ my $async = delete $self->{async};
+ if (scalar(@$r) == 3) { # error
if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
- $async = $async->($rpipe, $cb, $end);
- # $cb will call ->async_pass or ->close
- } else { # generic PSGI
- $cb->() while $qx;
+ # calls rpipe->close && ->event_step
+ $async->close;
+ } else {
+ $self->{rpipe}->close;
+ event_step($self);
}
- });
+ $wcb->($r);
+ } elsif ($async) {
+ # done reading headers, handoff to read body
+ my $fh = $wcb->($r); # scalar @$r == 2
+ $self->{fh} = $fh;
+ $async->async_pass($env->{'psgix.io'}, $fh,
+ delete($self->{hdr_buf}));
+ } else { # for synchronous PSGI servers
+ require PublicInbox::GetlineBody;
+ $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
+ \&event_step, $self,
+ ${$self->{hdr_buf}});
+ $wcb->($r);
+ }
+
+ # Workaround a leak under Perl 5.16.3 when combined with
+ # Plack::Middleware::Deflater:
+ $wcb = undef;
}
-# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
-sub filter_fh ($$) {
- my ($fh, $filter) = @_;
- Plack::Util::inline_object(
- close => sub {
- $fh->write($filter->(undef));
- $fh->close;
- },
- write => sub {
- $fh->write($filter->($_[0]));
- });
+sub psgi_return_start { # may run later, much later...
+ my ($self) = @_;
+ if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
+ # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj)
+ $self->{async} = $async->($self->{rpipe},
+ \&psgi_return_init_cb, $self, $self);
+ } else { # generic PSGI
+ psgi_return_init_cb($self) while $self->{parse_hdr};
+ }
}
# Used for streaming the stdout of one process as a PSGI response.
# psgi_return will return an anonymous
# sub for the PSGI server to call
#
-# $env->{'qspawn.filter'} - filter callback, receives a string as input,
-# undef on EOF
-#
# $limiter - the Limiter object to use (uses the def_limiter if not given)
#
# $parse_hdr - Initial read function; often for parsing CGI header output.
# psgix.io. 3-element arrays means the body is available
# immediately (or streamed via ->getline (pull-based)).
sub psgi_return {
- my ($self, $env, $limiter, $parse_hdr) = @_;
- my ($fh, $rpipe);
- my $end = sub {
- my $err = $_[0]; # $!
- log_err($env, "psgi_return: $err") if defined($err);
- finish($self, $env);
- $fh->close if $fh; # async-only
- };
-
- my $buf = '';
- my $rd_hdr = sub {
- # typically used for reading CGI headers
- # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm
- # We also need to check EINTR for generic PSGI servers.
- my $ret;
- my $total_rd = 0;
- do {
- my $r = sysread($rpipe, $buf, 4096, length($buf));
- if (defined($r)) {
- $total_rd += $r;
- $ret = $parse_hdr->($r ? $total_rd : 0, \$buf);
- } else {
- # caller should notify us when it's ready:
- return if $! == EAGAIN;
- next if $! == EINTR; # immediate retry
- log_err($env, "error reading header: $!");
- $ret = [ 500, [], [ "Internal error\n" ] ];
- }
- } until (defined $ret);
- $ret;
- };
-
- my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it
- my $async = $env->{'pi-httpd.async'};
-
- my $cb = sub {
- my $r = $rd_hdr->() or return;
- $rd_hdr = undef; # done reading headers
- my $filter = delete $env->{'qspawn.filter'};
- if (scalar(@$r) == 3) { # error
- if ($async) {
- $async->close; # calls rpipe->close and $end
- } else {
- $rpipe->close;
- $end->();
- }
- $wcb->($r);
- } elsif ($async) {
- # done reading headers, handoff to read body
- $fh = $wcb->($r); # scalar @$r == 2
- $fh = filter_fh($fh, $filter) if $filter;
- $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
- } else { # for synchronous PSGI servers
- require PublicInbox::GetlineBody;
- $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
- $buf, $filter);
- $wcb->($r);
- }
-
- # Workaround a leak under Perl 5.16.3 when combined with
- # Plack::Middleware::Deflater:
- $wcb = undef;
- };
+ my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_;
+ $self->{psgi_env} = $env;
+ $self->{hdr_buf} = \(my $hdr_buf = '');
+ $self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
- my $start_cb = sub { # may run later, much later...
- ($rpipe) = @_;
- if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
- $async = $async->($rpipe, $cb, $end);
- # $cb will call ->async_pass or ->close
- } else { # generic PSGI
- $cb->() while $rd_hdr;
- }
- };
# the caller already captured the PSGI write callback from
# the PSGI server, so we can call ->start, here:
- return $self->start($limiter, $start_cb) if $wcb;
+ $env->{'qspawn.wcb'} and
+ return start($self, $limiter, \&psgi_return_start);
# the caller will return this sub to the PSGI server, so
- # it can set the response callback (that is, for PublicInbox::HTTP,
- # the chunked_wcb or identity_wcb callback), but other HTTP servers
- # are supported:
+ # it can set the response callback (that is, for
+ # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+ # but other HTTP servers are supported:
sub {
- ($wcb) = @_;
- $self->start($limiter, $start_cb);
- };
+ $env->{'qspawn.wcb'} = $_[0];
+ start($self, $limiter, \&psgi_return_start);
+ }
}
package PublicInbox::Qspawn::Limiter;
}
sub retry_reopen {
- my ($self, $cb) = @_;
+ my ($self, $cb, $arg) = @_;
for my $i (1..10) {
if (wantarray) {
my @ret;
- eval { @ret = $cb->() };
+ eval { @ret = $cb->($arg) };
return @ret unless $@;
} else {
my $ret;
- eval { $ret = $cb->() };
+ eval { $ret = $cb->($arg) };
return $ret unless $@;
}
# Exception: The revision being read has been discarded -
sub _do_enquire {
my ($self, $query, $opts) = @_;
- retry_reopen($self, sub { _enquire_once($self, $query, $opts) });
+ retry_reopen($self, \&_enquire_once, [ $self, $query, $opts ]);
}
-sub _enquire_once {
- my ($self, $query, $opts) = @_;
+sub _enquire_once { # retry_reopen callback
+ my ($self, $query, $opts) = @{$_[0]};
my $xdb = xdb($self);
my $enquire = $X{Enquire}->new($xdb);
$enquire->set_query($query);
my $limit = $opts->{limit} || 50;
my $mset = $enquire->get_mset($offset, $limit);
return $mset if $opts->{mset};
- my @msgs = map {
- PublicInbox::SearchMsg->load_doc($_->get_document);
- } $mset->items;
+ my @msgs = map { PublicInbox::SearchMsg::from_mitem($_) } $mset->items;
return \@msgs unless wantarray;
($mset->get_matches_estimated, \@msgs)
}
# Only called by PSGI interface, not NNTP
-sub load_doc {
- my ($class, $doc) = @_;
- my $self = bless {}, $class;
- psgi_cull(load_expand($self, $doc));
+sub from_mitem {
+ my ($mitem, $srch) = @_;
+ return $srch->retry_reopen(\&from_mitem, $mitem) if $srch;
+ my $self = bless {}, __PACKAGE__;
+ psgi_cull(load_expand($self, $mitem->get_document));
}
# :bytes and :lines metadata in RFC 3977
PublicInbox::WwwStream->response($ctx, $code, $cb);
}
-# allow undef for individual doc loads...
-sub load_doc_retry {
- my ($srch, $mitem) = @_;
-
- eval {
- $srch->retry_reopen(sub {
- PublicInbox::SearchMsg->load_doc($mitem->get_document)
- });
- }
-}
-
# display non-nested search results similar to what users expect from
# regular WWW search engines:
sub mset_summary {
foreach my $m ($mset->items) {
my $rank = sprintf("%${pad}d", $m->get_rank + 1);
my $pct = get_pct($m);
- my $smsg = load_doc_retry($srch, $m);
+ my $smsg = PublicInbox::SearchMsg::from_mitem($m, $srch);
unless ($smsg) {
eval {
$m = "$m ".$m->get_docid . " expired\n";
}
sub sort_relevance {
- my ($pct) = @_;
- sub {
- [ sort { (eval { $pct->{$b->topmost->{id}} } || 0)
- <=>
- (eval { $pct->{$a->topmost->{id}} } || 0)
- } @{$_[0]} ] };
+ [ sort {
+ (eval { $b->topmost->{smsg}->{pct} } // 0) <=>
+ (eval { $a->topmost->{smsg}->{pct} } // 0)
+ } @{$_[0]} ]
}
sub get_pct ($) {
$n > 99 ? 99 : $n;
}
+sub load_msgs {
+ my ($mset) = @_;
+ [ map {
+ my $mi = $_;
+ my $smsg = PublicInbox::SearchMsg::from_mitem($mi);
+ $smsg->{pct} = get_pct($mi);
+ $smsg;
+ } ($mset->items) ]
+}
+
sub mset_thread {
my ($ctx, $mset, $q) = @_;
- my %pct;
- my $ibx = $ctx->{-inbox};
- my $msgs = $ibx->search->retry_reopen(sub { [ map {
- my $i = $_;
- my $smsg = PublicInbox::SearchMsg->load_doc($i->get_document);
- $pct{$smsg->mid} = get_pct($i);
- $smsg;
- } ($mset->items) ]});
+ my $msgs = $ctx->{-inbox}->search->retry_reopen(\&load_msgs, $mset);
my $r = $q->{r};
my $rootset = PublicInbox::SearchThread::thread($msgs,
- $r ? sort_relevance(\%pct) : \&PublicInbox::View::sort_ds,
+ $r ? \&sort_relevance : \&PublicInbox::View::sort_ds,
$ctx);
my $skel = search_nav_bot($mset, $q). "<pre>";
$ctx->{-upfx} = '';
$ctx->{cur_level} = 0;
$ctx->{dst} = \$skel;
$ctx->{mapping} = {};
- $ctx->{pct} = \%pct;
+ $ctx->{searchview} = 1;
$ctx->{prev_attr} = '';
$ctx->{prev_level} = 0;
$ctx->{s_nr} = scalar(@$msgs).'+ results';
*PublicInbox::View::pre_thread);
@$msgs = reverse @$msgs if $r;
- sub {
- return unless $msgs;
- my $smsg;
- while (my $m = pop @$msgs) {
- $smsg = $ibx->smsg_mime($m) and last;
- }
- if ($smsg) {
- return PublicInbox::View::index_entry($smsg, $ctx,
- scalar @$msgs);
- }
- $msgs = undef;
- $skel .= "\n</pre>";
- };
+ $ctx->{msgs} = $msgs;
+ \&mset_thread_i;
+}
+
+# callback for PublicInbox::WwwStream::getline
+sub mset_thread_i {
+ my ($nr, $ctx) = @_;
+ my $msgs = $ctx->{msgs} or return;
+ while (my $smsg = pop @$msgs) {
+ $ctx->{-inbox}->smsg_mime($smsg) or next;
+ return PublicInbox::View::index_entry($smsg, $ctx,
+ scalar @$msgs);
+ }
+ my ($skel) = delete @$ctx{qw(dst msgs)};
+ $$skel .= "\n</pre>";
}
sub ctx_prepare {
sub adump {
my ($cb, $mset, $q, $ctx) = @_;
- my $ibx = $ctx->{-inbox};
- my @items = $mset->items;
- $ctx->{search_query} = $q;
- my $srch = $ibx->search;
- PublicInbox::WwwAtomStream->response($ctx, 200, sub {
- while (my $x = shift @items) {
- $x = load_doc_retry($srch, $x);
- $x = $ibx->smsg_mime($x) and return $x;
- }
- return undef;
- });
+ $ctx->{items} = [ $mset->items ];
+ $ctx->{search_query} = $q; # used by WwwAtomStream::atom_header
+ $ctx->{srch} = $ctx->{-inbox}->search;
+ PublicInbox::WwwAtomStream->response($ctx, 200, \&adump_i);
+}
+
+# callback for PublicInbox::WwwAtomStream::getline
+sub adump_i {
+ my ($ctx) = @_;
+ while (my $mi = shift @{$ctx->{items}}) {
+ my $smsg = eval {
+ PublicInbox::SearchMsg::from_mitem($mi, $ctx->{srch});
+ } or next;
+ $ctx->{-inbox}->smsg_mime($smsg) and return $smsg;
+ }
}
package PublicInbox::SearchQuery;
print { $_[0]->{out} } $_[1], "\n" or ERR($_[0], "print(dbg): $!");
}
+sub done ($$) {
+ my ($self, $res) = @_;
+ my $ucb = delete($self->{user_cb}) or return;
+ $ucb->($res, $self->{uarg});
+}
+
sub ERR ($$) {
my ($self, $err) = @_;
print { $self->{out} } $err, "\n";
- my $ucb = delete($self->{user_cb});
- eval { $ucb->($err) } if $ucb;
+ eval { done($self, $err) };
die $err;
}
scalar(@ambiguous) ? \@ambiguous : undef;
}
-sub extract_diff ($$$$$) {
- my ($self, $p, $re, $ibx, $smsg) = @_;
+sub extract_diff ($$) {
+ my ($p, $arg) = @_;
+ my ($self, $diffs, $re, $ibx, $smsg) = @$arg;
my ($part) = @$p; # ignore $depth and @idx;
my $hdr_lines; # diff --git a/... b/...
my $tmp;
}
return undef unless $tmp;
close $tmp or die "close(tmp): $!";
- $di;
+ push @$diffs, $di;
}
sub path_searchable ($) { defined($_[0]) && $_[0] =~ m!\A[\w/\. \-]+\z! }
my $msgs = $srch->query($q, { relevance => 1 });
my $re = qr/\Aindex ($pre[a-f0-9]*)\.\.($post[a-f0-9]*)(?: ([0-9]+))?/;
-
- my @di;
+ my $diffs = [];
foreach my $smsg (@$msgs) {
$ibx->smsg_mime($smsg) or next;
- msg_iter(delete($smsg->{mime}), sub {
- my $di = extract_diff($self, $_[0], $re, $ibx, $smsg);
- push @di, $di if defined($di);
- });
+ my $mime = delete $smsg->{mime};
+ msg_iter($mime, \&extract_diff,
+ [$self, $diffs, $re, $ibx, $smsg]);
+ }
+ @$diffs ? $diffs : undef;
+}
+
+sub update_index_result ($$) {
+ my ($bref, $self) = @_;
+ my ($qsp, $msg) = delete @$self{qw(-qsp -msg)};
+ if (my $err = $qsp->{err}) {
+ ERR($self, "git update-index error: $err");
}
- @di ? \@di : undef;
+ dbg($self, $msg);
+ next_step($self); # onto do_git_apply
}
sub prepare_index ($) {
my $rdr = { 0 => fileno($in), -hold => $in };
my $cmd = [ qw(git update-index -z --index-info) ];
my $qsp = PublicInbox::Qspawn->new($cmd, $self->{git_env}, $rdr);
- $qsp->psgi_qx($self->{psgi_env}, undef, sub {
- my ($bref) = @_;
- if (my $err = $qsp->{err}) {
- ERR($self, "git update-index error: $err");
- }
- dbg($self, "index prepared:\n" .
- "$mode_a $oid_full\t" . git_quote($path_a));
- next_step($self); # onto do_git_apply
- });
+ $path_a = git_quote($path_a);
+ $self->{-qsp} = $qsp;
+ $self->{-msg} = "index prepared:\n$mode_a $oid_full\t$path_a";
+ $qsp->psgi_qx($self->{psgi_env}, undef, \&update_index_result, $self);
}
# pure Perl "git init"
'100644';
}
-sub do_finish ($$) {
- my ($self, $user_cb) = @_;
- my $found = $self->{found};
- my $oid_want = $self->{oid_want};
+sub do_finish ($) {
+ my ($self) = @_;
+ my ($found, $oid_want) = @$self{qw(found oid_want)};
if (my $exists = $found->{$oid_want}) {
- return $user_cb->($exists);
+ return done($self, $exists);
}
# let git disambiguate if oid_want was too short,
# but long enough to be unambiguous:
my $tmp_git = $self->{tmp_git};
if (my @res = $tmp_git->check($oid_want)) {
- return $user_cb->($found->{$res[0]});
+ return done($self, $found->{$res[0]});
}
if (my $err = $tmp_git->last_check_err) {
dbg($self, $err);
}
- $user_cb->(undef);
+ done($self, undef);
}
-sub do_step ($) {
+sub event_step ($) {
my ($self) = @_;
eval {
# step 1: resolve blobs to patches in the todo queue
# our result: (which may be undef)
# Other steps may call user_cb to terminate prematurely
# on error
- } elsif (my $user_cb = delete($self->{user_cb})) {
- do_finish($self, $user_cb);
+ } elsif (exists $self->{user_cb}) {
+ do_finish($self);
} else {
die 'about to call user_cb twice'; # Oops :x
}
if ($err) {
$err =~ s/^\s*Exception:\s*//; # bad word to show users :P
dbg($self, "E: $err");
- my $ucb = delete($self->{user_cb});
- eval { $ucb->($err) } if $ucb;
+ eval { done($self, $err) };
}
}
-sub step_cb ($) {
- my ($self) = @_;
- sub { do_step($self) };
-}
-
sub next_step ($) {
my ($self) = @_;
# if outside of public-inbox-httpd, caller is expected to be
- # looping step_cb, anyways
+ # looping event_step, anyways
my $async = $self->{psgi_env}->{'pi-httpd.async'} or return;
# PublicInbox::HTTPD::Async->new
- $async->(undef, step_cb($self));
+ $async->(undef, undef, $self);
}
sub mark_found ($$$) {
}
}
-sub parse_ls_files ($$$$) {
- my ($self, $qsp, $bref, $di) = @_;
+sub parse_ls_files ($$) {
+ my ($self, $bref) = @_;
+ my ($qsp, $di) = delete @$self{qw(-qsp -cur_di)};
if (my $err = $qsp->{err}) {
die "git ls-files error: $err";
}
next_step($self); # onto the next patch
}
-sub start_ls_files ($$) {
- my ($self, $di) = @_;
- my $cmd = [qw(git ls-files -s -z)];
- my $qsp = PublicInbox::Qspawn->new($cmd, $self->{git_env});
- $qsp->psgi_qx($self->{psgi_env}, undef, sub {
- my ($bref) = @_;
- eval { parse_ls_files($self, $qsp, $bref, $di) };
- ERR($self, $@) if $@;
- });
+sub ls_files_result {
+ my ($bref, $self) = @_;
+ eval { parse_ls_files($self, $bref) };
+ ERR($self, $@) if $@;
}
sub oids_same_ish ($$) {
}
}
+sub apply_result ($$) {
+ my ($bref, $self) = @_;
+ my ($qsp, $di) = delete @$self{qw(-qsp -cur_di)};
+ dbg($self, $$bref);
+ my $patches = $self->{patches};
+ if (my $err = $qsp->{err}) {
+ my $msg = "git apply error: $err";
+ my $nxt = $patches->[0];
+ if ($nxt && oids_same_ish($nxt->{oid_b}, $di->{oid_b})) {
+ dbg($self, $msg);
+ dbg($self, 'trying '.di_url($self, $nxt));
+ } else {
+ ERR($self, $msg);
+ }
+ } else {
+ skip_identical($self, $patches, $di->{oid_b});
+ }
+
+ my @cmd = qw(git ls-files -s -z);
+ $qsp = PublicInbox::Qspawn->new(\@cmd, $self->{git_env});
+ $self->{-cur_di} = $di;
+ $self->{-qsp} = $qsp;
+ $qsp->psgi_qx($self->{psgi_env}, undef, \&ls_files_result, $self);
+}
+
sub do_git_apply ($) {
my ($self) = @_;
my $dn = $self->{tmp}->dirname;
my $rdr = { 2 => 1 };
my $qsp = PublicInbox::Qspawn->new(\@cmd, $self->{git_env}, $rdr);
- $qsp->psgi_qx($self->{psgi_env}, undef, sub {
- my ($bref) = @_;
- dbg($self, $$bref);
- if (my $err = $qsp->{err}) {
- my $msg = "git apply error: $err";
- my $nxt = $patches->[0];
- if ($nxt && oids_same_ish($nxt->{oid_b}, $prv_oid_b)) {
- dbg($self, $msg);
- dbg($self, 'trying '.di_url($self, $nxt));
- } else {
- ERR($self, $msg);
- }
- } else {
- skip_identical($self, $patches, $di->{oid_b});
- }
- eval { start_ls_files($self, $di) };
- ERR($self, $@) if $@;
- });
+ $self->{-cur_di} = $di;
+ $self->{-qsp} = $qsp;
+ $qsp->psgi_qx($self->{psgi_env}, undef, \&apply_result, $self);
}
sub di_url ($$) {
join("\n", $found_git->pub_urls($self->{psgi_env})));
if ($cur_want eq $self->{oid_want} || $type ne 'blob') {
- eval { delete($self->{user_cb})->($existing) };
+ eval { done($self, $existing) };
die "E: $@" if $@;
return;
}
}
dbg($self, "could not find $cur_want");
- eval { delete($self->{user_cb})->(undef) }; # not found! :<
+ eval { done($self, undef) };
die "E: $@" if $@;
}
# this API is designed to avoid creating self-referential structures;
# so user_cb never references the SolverGit object
sub new {
- my ($class, $ibx, $user_cb) = @_;
+ my ($class, $ibx, $user_cb, $uarg) = @_;
bless {
gits => $ibx->{-repo_objs},
user_cb => $user_cb,
+ uarg => $uarg,
+ # -cur_di, -qsp, -msg => temporary fields for Qspawn callbacks
# TODO: config option for searching related inboxes
inboxes => [ $ibx ],
# should we even get here? Probably not, but somebody
# could be manually typing URLs:
- return (delete $self->{user_cb})->(undef) if $oid_want =~ /\A0+\z/;
+ return done($self, undef) if $oid_want =~ /\A0+\z/;
$self->{oid_want} = $oid_want;
$self->{out} = $out;
$self->{tmp} = File::Temp->newdir("solver.$oid_want-XXXXXXXX", TMPDIR => 1);
dbg($self, "solving $oid_want ...");
- my $step_cb = step_cb($self);
if (my $async = $env->{'pi-httpd.async'}) {
# PublicInbox::HTTPD::Async->new
- $async->(undef, $step_cb);
+ $async->(undef, undef, $self);
} else {
- $step_cb->() while $self->{user_cb};
+ event_step($self) while $self->{user_cb};
}
}
use constant TCHILD => '` ';
sub th_pfx ($) { $_[0] == 0 ? '' : TCHILD };
+sub msg_html_i {
+ my ($nr, $ctx) = @_;
+ my $more = $ctx->{more};
+ if ($nr == 1) {
+ # $more cannot be true w/o $smsg being defined:
+ my $upfx = $more ? '../'.mid_escape($ctx->{smsg}->mid).'/' : '';
+ $ctx->{tip} .
+ multipart_text_as_html($ctx->{mime}, $upfx, $ctx) .
+ '</pre><hr>'
+ } elsif ($more && @$more) {
+ ++$ctx->{end_nr};
+ msg_html_more($ctx, $more, $nr);
+ } elsif ($nr == $ctx->{end_nr}) {
+ # fake an EOF if generating the footer fails;
+ # we want to at least show the message if something
+ # here crashes:
+ eval {
+ my $hdr = delete($ctx->{mime})->header_obj;
+ '<pre>' . html_footer($hdr, 1, $ctx) .
+ '</pre>' . msg_reply($ctx, $hdr)
+ };
+ } else {
+ undef
+ }
+}
+
# public functions: (unstable)
sub msg_html {
my ($ctx, $mime, $more, $smsg) = @_;
- my $hdr = $mime->header_obj;
my $ibx = $ctx->{-inbox};
$ctx->{-obfs_ibx} = $ibx->{obfuscate} ? $ibx : undef;
- my $tip = _msg_html_prepare($hdr, $ctx, $more, 0);
- my $end = 2;
- PublicInbox::WwwStream->response($ctx, 200, sub {
- my ($nr, undef) = @_;
- if ($nr == 1) {
- # $more cannot be true w/o $smsg being defined:
- my $upfx = $more ? '../'.mid_escape($smsg->mid).'/' : '';
- $tip . multipart_text_as_html($mime, $upfx, $ctx) .
- '</pre><hr>'
- } elsif ($more && @$more) {
- ++$end;
- msg_html_more($ctx, $more, $nr);
- } elsif ($nr == $end) {
- # fake an EOF if generating the footer fails;
- # we want to at least show the message if something
- # here crashes:
- eval {
- '<pre>' . html_footer($hdr, 1, $ctx) .
- '</pre>' . msg_reply($ctx, $hdr)
- };
- } else {
- undef
- }
- });
+ $ctx->{tip} = _msg_html_prepare($mime->header_obj, $ctx, $more, 0);
+ $ctx->{more} = $more;
+ $ctx->{end_nr} = 2;
+ $ctx->{smsg} = $smsg;
+ $ctx->{mime} = $mime;
+ PublicInbox::WwwStream->response($ctx, 200, \&msg_html_i);
}
sub msg_page {
$rv .= "\n";
# scan through all parts, looking for displayable text
- my $ibx = $ctx->{-inbox};
- msg_iter($mime, sub { $rv .= add_text_body($mhref, $ctx, $_[0]) });
+ $ctx->{mhref} = $mhref;
+ $ctx->{rv} = \$rv;
+ msg_iter($mime, \&add_text_body, $ctx);
+ delete $ctx->{rv};
# add the footer
$rv .= "\n<a\nhref=#$id_m\nid=e$id>^</a> ".
" <a\nhref=\"${mhref}#R\">reply</a>";
my $hr;
- if (my $pct = $ctx->{pct}) { # used by SearchView.pm
- $rv .= "\t[relevance $pct->{$mid_raw}%]";
+ if (defined(my $pct = $smsg->{pct})) { # used by SearchView.pm
+ $rv .= "\t[relevance $pct%]";
$hr = 1;
} elsif ($mapping) {
my $nested = 'nested';
$beg . '<pre>' . index_entry($smsg, $ctx, 0) . '</pre>' . $end;
}
+sub stream_thread_i { # PublicInbox::WwwStream::getline callback
+ my ($nr, $ctx) = @_;
+ return unless exists($ctx->{dst});
+ my $q = $ctx->{-queue};
+ while (@$q) {
+ my $level = shift @$q;
+ my $node = shift @$q or next;
+ my $cl = $level + 1;
+ unshift @$q, map { ($cl, $_) } @{$node->{children}};
+ if (my $smsg = $ctx->{-inbox}->smsg_mime($node->{smsg})) {
+ return thread_index_entry($ctx, $level, $smsg);
+ } else {
+ return ghost_index_entry($ctx, $level, $node);
+ }
+ }
+ join('', thread_adj_level($ctx, 0)) . ${delete $ctx->{dst}}; # skel
+}
+
sub stream_thread ($$) {
my ($rootset, $ctx) = @_;
my $ibx = $ctx->{-inbox};
my @q = map { (0, $_) } @$rootset;
- my $level;
- my $smsg;
+ my ($smsg, $level);
while (@q) {
$level = shift @q;
my $node = shift @q or next;
$ctx->{-obfs_ibx} = $ibx->{obfuscate} ? $ibx : undef;
$ctx->{-title_html} = ascii_html($smsg->subject);
$ctx->{-html_tip} = thread_index_entry($ctx, $level, $smsg);
- $smsg = undef;
- PublicInbox::WwwStream->response($ctx, 200, sub {
- return unless $ctx;
- while (@q) {
- $level = shift @q;
- my $node = shift @q or next;
- my $cl = $level + 1;
- unshift @q, map { ($cl, $_) } @{$node->{children}};
- if ($smsg = $ibx->smsg_mime($node->{smsg})) {
- return thread_index_entry($ctx, $level, $smsg);
- } else {
- return ghost_index_entry($ctx, $level, $node);
- }
- }
- my $ret = join('', thread_adj_level($ctx, 0));
- $ret .= ${$ctx->{dst}}; # skel
- $ctx = undef;
- $ret;
- });
+ $ctx->{-queue} = \@q;
+ PublicInbox::WwwStream->response($ctx, 200, \&stream_thread_i);
}
sub thread_html {
return missing_thread($ctx) unless $smsg;
$ctx->{-title_html} = ascii_html($smsg->subject);
$ctx->{-html_tip} = '<pre>'.index_entry($smsg, $ctx, scalar @$msgs);
- $smsg = undef;
- PublicInbox::WwwStream->response($ctx, 200, sub {
- return unless $msgs;
- $smsg = undef;
- while (my $m = shift @$msgs) {
- $smsg = $ibx->smsg_mime($m) and last;
- }
- return index_entry($smsg, $ctx, scalar @$msgs) if $smsg;
- $msgs = undef;
- $skel;
- });
+ $ctx->{msgs} = $msgs;
+ PublicInbox::WwwStream->response($ctx, 200, \&thread_html_i);
+}
+
+sub thread_html_i { # PublicInbox::WwwStream::getline callback
+ my ($nr, $ctx) = @_;
+ my $msgs = $ctx->{msgs} or return;
+ while (my $smsg = shift @$msgs) {
+ $ctx->{-inbox}->smsg_mime($smsg) or next;
+ return index_entry($smsg, $ctx, scalar @$msgs);
+ }
+ my ($skel) = delete @$ctx{qw(dst msgs)};
+ $$skel;
}
sub multipart_text_as_html {
- my ($mime, $upfx, $ctx) = @_;
- my $rv = "";
+ my ($mime, $mhref, $ctx) = @_;
+ $ctx->{mhref} = $mhref;
+ $ctx->{rv} = \(my $rv = '');
# scan through all parts, looking for displayable text
- msg_iter($mime, sub { $rv .= add_text_body($upfx, $ctx, $_[0]) });
- $rv;
+ msg_iter($mime, \&add_text_body, $ctx);
+ ${delete $ctx->{rv}};
}
sub flush_quote {
}
sub attach_link ($$$$;$) {
- my ($upfx, $ct, $p, $fn, $err) = @_;
+ my ($ctx, $ct, $p, $fn, $err) = @_;
my ($part, $depth, @idx) = @$p;
my $nl = $idx[-1] > 1 ? "\n" : '';
my $idx = join('.', @idx);
} else {
$sfn = 'a.bin';
}
- my $ret = qq($nl<a\nhref="$upfx$idx-$sfn">);
+ my $rv = $ctx->{rv};
+ $$rv .= qq($nl<a\nhref="$ctx->{mhref}$idx-$sfn">);
if ($err) {
- $ret .=
-"[-- Warning: decoded text below may be mangled --]\n";
+ $$rv .= "[-- Warning: decoded text below may be mangled --]\n";
}
- $ret .= "[-- Attachment #$idx: ";
+ $$rv .= "[-- Attachment #$idx: ";
my $ts = "Type: $ct, Size: $size bytes";
$desc = ascii_html($desc);
- $ret .= ($desc eq '') ? "$ts --]" : "$desc --]\n[-- $ts --]";
- $ret .= "</a>\n";
+ $$rv .= ($desc eq '') ? "$ts --]" : "$desc --]\n[-- $ts --]";
+ $$rv .= "</a>\n";
+ undef;
}
-sub add_text_body {
- my ($upfx, $ctx, $p) = @_;
+sub add_text_body { # callback for msg_iter
+ my ($p, $ctx) = @_;
+ my $upfx = $ctx->{mhref};
my $ibx = $ctx->{-inbox};
- my $obfs_ibx = $ibx->{obfuscate} ? $ibx : undef;
# $p - from msg_iter: [ Email::MIME, depth, @idx ]
my ($part, $depth, @idx) = @$p;
my $ct = $part->content_type || 'text/plain';
my $fn = $part->filename;
my ($s, $err) = msg_part_text($part, $ct);
-
- return attach_link($upfx, $ct, $p, $fn) unless defined $s;
+ return attach_link($ctx, $ct, $p, $fn) unless defined $s;
# makes no difference to browsers, and don't screw up filename
# link generation in diffs with the extra '%0D'
# split off quoted and unquoted blocks:
my @sections = split(/((?:^>[^\n]*\n)+)/sm, $s);
$s = '';
+ my $rv = $ctx->{rv};
if (defined($fn) || $depth > 0 || $err) {
# badly-encoded message with $err? tell the world about it!
- $s .= attach_link($upfx, $ct, $p, $fn, $err);
- $s .= "\n";
+ attach_link($ctx, $ct, $p, $fn, $err);
+ $$rv .= "\n";
}
my $l = PublicInbox::Linkify->new;
foreach my $cur (@sections) {
if ($cur =~ /\A>/) {
- flush_quote(\$s, $l, \$cur);
+ flush_quote($rv, $l, \$cur);
} elsif ($diff) {
@$diff = split(/^/m, $cur);
$cur = undef;
- flush_diff(\$s, $ctx, $l);
+ flush_diff($rv, $ctx, $l);
} else {
# regular lines, OK
$l->linkify_1($cur);
- $s .= $l->linkify_2(ascii_html($cur));
+ $$rv .= $l->linkify_2(ascii_html($cur));
$cur = undef;
}
}
- obfuscate_addrs($obfs_ibx, $s) if $obfs_ibx;
- $s;
+ obfuscate_addrs($ibx, $$rv) if $ibx->{obfuscate};
}
sub _msg_html_prepare {
my $d = fmt_ts($smsg->{ds});
my $unmatched; # if lazy-loaded by SearchThread::Msg::visible()
- if (my $pct = $ctx->{pct}) {
- $pct = $pct->{$smsg->{mid}};
- if (defined $pct) {
+ if (exists $ctx->{searchview}) {
+ if (defined(my $pct = $smsg->{pct})) {
$d .= (sprintf(' % 2u', $pct) . '%');
} else {
$unmatched = 1;
my $mid = $node->{id};
my $d = ' [not found] ';
- $d .= ' ' if exists $ctx->{pct};
+ $d .= ' ' if exists $ctx->{searchview};
$d .= indent_for($level) . th_pfx($level);
my $upfx = $ctx->{-upfx};
my $m = PublicInbox::Hval->new_msgid($mid);
our $MAX_SIZE = 1024 * 1024; # TODO: configurable
my $BIN_DETECT = 8000; # same as git
+sub html_i { # WwwStream::getline callback
+ my ($nr, $ctx) = @_;
+ $nr == 1 ? ${delete $ctx->{rv}} : undef;
+}
+
sub html_page ($$$) {
my ($ctx, $code, $strref) = @_;
my $wcb = delete $ctx->{-wcb};
$ctx->{-upfx} = '../../'; # from "/$INBOX/$OID/s/"
- my $res = PublicInbox::WwwStream->response($ctx, $code, sub {
- my ($nr, undef) = @_;
- $nr == 1 ? $$strref : undef;
- });
+ $ctx->{rv} = $strref;
+ my $res = PublicInbox::WwwStream->response($ctx, $code, \&html_i);
$wcb ? $wcb->($res) : $res;
}
+sub stream_blob_parse_hdr { # {parse_hdr} for Qspawn
+ my ($r, $bref, $ctx) = @_;
+ my ($res, $logref) = delete @$ctx{qw(-res -logref)};
+ my ($git, $oid, $type, $size, $di) = @$res;
+ my @cl = ('Content-Length', $size);
+ if (!defined $r) { # error
+ html_page($ctx, 500, $logref);
+ } elsif (index($$bref, "\0") >= 0) {
+ [200, [qw(Content-Type application/octet-stream), @cl] ];
+ } else {
+ my $n = bytes::length($$bref);
+ if ($n >= $BIN_DETECT || $n == $size) {
+ return [200, [ 'Content-Type',
+ 'text/plain; charset=UTF-8', @cl ] ];
+ }
+ if ($r == 0) {
+ warn "premature EOF on $oid $$logref\n";
+ return html_page($ctx, 500, $logref);
+ }
+ undef; # bref keeps growing
+ }
+}
+
sub stream_large_blob ($$$$) {
my ($ctx, $res, $logref, $fn) = @_;
+ $ctx->{-logref} = $logref;
+ $ctx->{-res} = $res;
my ($git, $oid, $type, $size, $di) = @$res;
my $cmd = ['git', "--git-dir=$git->{git_dir}", 'cat-file', $type, $oid];
my $qsp = PublicInbox::Qspawn->new($cmd);
- my @cl = ('Content-Length', $size);
my $env = $ctx->{env};
- $env->{'public-inbox.tmpgit'} = $git; # for {-tmp}/File::Temp::Dir
$env->{'qspawn.wcb'} = delete $ctx->{-wcb};
- $qsp->psgi_return($env, undef, sub {
- my ($r, $bref) = @_;
- if (!defined $r) { # error
- html_page($ctx, 500, $logref);
- } elsif (index($$bref, "\0") >= 0) {
- my $ct = 'application/octet-stream';
- [200, ['Content-Type', $ct, @cl ] ];
- } else {
- my $n = bytes::length($$bref);
- if ($n >= $BIN_DETECT || $n == $size) {
- my $ct = 'text/plain; charset=UTF-8';
- return [200, ['Content-Type', $ct, @cl] ];
- }
- if ($r == 0) {
- warn "premature EOF on $oid $$logref\n";
- return html_page($ctx, 500, $logref);
- }
- undef; # bref keeps growing
- }
- });
+ $qsp->psgi_return($env, undef, \&stream_blob_parse_hdr, $ctx);
+}
+
+sub show_other_result ($$) {
+ my ($bref, $ctx) = @_;
+ my ($qsp, $logref) = delete @$ctx{qw(-qsp -logref)};
+ if (my $err = $qsp->{err}) {
+ utf8::decode($$err);
+ $$logref .= "git show error: $err";
+ return html_page($ctx, 500, $logref);
+ }
+ my $l = PublicInbox::Linkify->new;
+ utf8::decode($$bref);
+ $l->linkify_1($$bref);
+ $$bref = '<pre>'. $l->linkify_2(ascii_html($$bref));
+ $$bref .= '</pre><hr>' . $$logref;
+ html_page($ctx, 200, $bref);
}
sub show_other ($$$$) {
qw(show --encoding=UTF-8 --no-color --no-abbrev), $oid ];
my $qsp = PublicInbox::Qspawn->new($cmd);
my $env = $ctx->{env};
- $qsp->psgi_qx($env, undef, sub {
- my ($bref) = @_;
- if (my $err = $qsp->{err}) {
- utf8::decode($$err);
- $$logref .= "git show error: $err";
- return html_page($ctx, 500, $logref);
- }
- my $l = PublicInbox::Linkify->new;
- utf8::decode($$bref);
- $l->linkify_1($$bref);
- $$bref = '<pre>'. $l->linkify_2(ascii_html($$bref));
- $$bref .= '</pre><hr>' . $$logref;
- html_page($ctx, 200, $bref);
- });
+ $ctx->{-qsp} = $qsp;
+ $ctx->{-logref} = $logref;
+ $qsp->psgi_qx($env, undef, \&show_other_result, $ctx);
}
+# user_cb for SolverGit, called as: user_cb->($result_or_error, $uarg)
sub solve_result {
- my ($ctx, $res, $log, $hints, $fn) = @_;
+ my ($res, $ctx) = @_;
+ my ($log, $hints, $fn) = delete @$ctx{qw(log hints fn)};
unless (seek($log, 0, 0)) {
$ctx->{env}->{'psgi.errors'}->print("seek(log): $!\n");
sub show ($$;$) {
my ($ctx, $oid_b, $fn) = @_;
my $qp = $ctx->{qp};
- my $hints = {};
+ my $hints = $ctx->{hints} = {};
while (my ($from, $to) = each %QP_MAP) {
defined(my $v = $qp->{$from}) or next;
$hints->{$to} = $v;
}
- my $log = tmpfile("solve.$oid_b");
- my $solver = PublicInbox::SolverGit->new($ctx->{-inbox}, sub {
- solve_result($ctx, $_[0], $log, $hints, $fn);
- });
-
- # PSGI server will call this and give us a callback
+ $ctx->{'log'} = tmpfile("solve.$oid_b");
+ $ctx->{fn} = $fn;
+ my $solver = PublicInbox::SolverGit->new($ctx->{-inbox},
+ \&solve_result, $ctx);
+ # PSGI server will call this immediately and give us a callback (-wcb)
sub {
$ctx->{-wcb} = $_[0]; # HTTP write callback
- $solver->solve($ctx->{env}, $log, $oid_b, $hints);
+ $solver->solve($ctx->{env}, $ctx->{log}, $oid_b, $hints);
};
}
use strict;
use warnings;
use bytes (); # only for bytes::length
-use Plack::Util;
use PublicInbox::Config;
use PublicInbox::Hval;
use URI::Escape qw(uri_unescape);
require PublicInbox::Cgit;
PublicInbox::Cgit->new($pi_config);
} else {
+ require Plack::Util;
Plack::Util::inline_object(call => sub { r404() });
}
}
sub getline {
my ($self) = @_;
if (my $middle = $self->{cb}) {
- my $smsg = $middle->();
+ my $smsg = $middle->($self->{ctx});
return feed_entry($self, $smsg) if $smsg;
}
delete $self->{cb} ? '</feed>' : undef;
use PublicInbox::MIME;
use PublicInbox::MsgIter;
+sub get_attach_i { # msg_iter callback
+ my ($part, $depth, @idx) = @{$_[0]};
+ my $res = $_[1];
+ return if join('.', @idx) ne $res->[3]; # $idx
+ $res->[0] = 200;
+ my $ct = $part->content_type;
+ $ct = parse_content_type($ct) if $ct;
+
+ # discrete == type, we remain Debian wheezy-compatible
+ if ($ct && (($ct->{discrete} || '') eq 'text')) {
+ # display all text as text/plain:
+ my $cset = $ct->{attributes}->{charset};
+ if ($cset && ($cset =~ /\A[a-zA-Z0-9_\-]+\z/)) {
+ $res->[1]->[1] .= qq(; charset=$cset);
+ }
+ } else { # TODO: allow user to configure safe types
+ $res->[1]->[1] = 'application/octet-stream';
+ }
+ $part = $part->body;
+ push @{$res->[1]}, 'Content-Length', bytes::length($part);
+ $res->[2]->[0] = $part;
+}
+
# /$LISTNAME/$MESSAGE_ID/$IDX-$FILENAME
sub get_attach ($$$) {
my ($ctx, $idx, $fn) = @_;
my $res = [ 404, [ 'Content-Type', 'text/plain' ], [ "Not found\n" ] ];
my $mime = $ctx->{-inbox}->msg_by_mid($ctx->{mid}) or return $res;
$mime = PublicInbox::MIME->new($mime);
- msg_iter($mime, sub {
- my ($part, $depth, @idx) = @{$_[0]};
- return if join('.', @idx) ne $idx;
- $res->[0] = 200;
- my $ct = $part->content_type;
- $ct = parse_content_type($ct) if $ct;
-
- # discrete == type, we remain Debian wheezy-compatible
- if ($ct && (($ct->{discrete} || '') eq 'text')) {
- # display all text as text/plain:
- my $cset = $ct->{attributes}->{charset};
- if ($cset && ($cset =~ /\A[a-zA-Z0-9_\-]+\z/)) {
- $res->[1]->[1] .= qq(; charset=$cset);
- }
- } else { # TODO: allow user to configure safe types
- $res->[1]->[1] = 'application/octet-stream';
- }
- $part = $part->body;
- push @{$res->[1]}, 'Content-Length', bytes::length($part);
- $res->[2]->[0] = $part;
- });
- $res;
+ $res->[3] = $idx;
+ msg_iter($mime, \&get_attach_i, $res);
+ pop @$res; # cleanup before letting PSGI server see it
+ $res
}
1;
require File::Spec;
*try_cat = \&PublicInbox::Inbox::try_cat;
+sub list_all_i {
+ my ($ibx, $arg) = @_;
+ my ($list, $hide_key) = @$arg;
+ push @$list, $ibx unless $ibx->{-hide}->{$hide_key};
+}
+
sub list_all ($$$) {
my ($self, $env, $hide_key) = @_;
- my @list;
- $self->{pi_config}->each_inbox(sub {
- my ($ibx) = @_;
- push @list, $ibx unless $ibx->{-hide}->{$hide_key};
- });
- \@list;
+ my $list = [];
+ $self->{pi_config}->each_inbox(\&list_all_i, [ $list, $hide_key ]);
+ $list;
+}
+
+sub list_match_domain_i {
+ my ($ibx, $arg) = @_;
+ my ($list, $hide_key, $re) = @$arg;
+ if (!$ibx->{-hide}->{$hide_key} && $ibx->{url} =~ $re) {
+ push @$list, $ibx;
+ }
}
sub list_match_domain ($$$) {
my ($self, $env, $hide_key) = @_;
- my @list;
+ my $list = [];
my $host = $env->{HTTP_HOST} // $env->{SERVER_NAME};
$host =~ s/:[0-9]+\z//;
- my $re = qr!\A(?:https?:)?//\Q$host\E(?::[0-9]+)?/!i;
- $self->{pi_config}->each_inbox(sub {
- my ($ibx) = @_;
- if (!$ibx->{-hide}->{$hide_key} && $ibx->{url} =~ $re) {
- push @list, $ibx;
- }
- });
- \@list;
+ my $arg = [ $list, $hide_key,
+ qr!\A(?:https?:)?//\Q$host\E(?::[0-9]+)?/!i ];
+ $self->{pi_config}->each_inbox(\&list_match_domain_i, $arg);
+ $list;
}
sub list_404 ($$) { [] }
--- /dev/null
+# Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+package PublicInbox::WwwStatic;
+use strict;
+use Fcntl qw(:seek);
+
+sub prepare_range {
+ my ($env, $in, $h, $beg, $end, $size) = @_;
+ my $code = 200;
+ my $len = $size;
+ if ($beg eq '') {
+ if ($end ne '') { # "bytes=-$end" => last N bytes
+ $beg = $size - $end;
+ $beg = 0 if $beg < 0;
+ $end = $size - 1;
+ $code = 206;
+ } else {
+ $code = 416;
+ }
+ } else {
+ if ($beg > $size) {
+ $code = 416;
+ } elsif ($end eq '' || $end >= $size) {
+ $end = $size - 1;
+ $code = 206;
+ } elsif ($end < $size) {
+ $code = 206;
+ } else {
+ $code = 416;
+ }
+ }
+ if ($code == 206) {
+ $len = $end - $beg + 1;
+ if ($len <= 0) {
+ $code = 416;
+ } else {
+ sysseek($in, $beg, SEEK_SET) or return [ 500, [], [] ];
+ push @$h, qw(Accept-Ranges bytes Content-Range);
+ push @$h, "bytes $beg-$end/$size";
+
+ # FIXME: Plack::Middleware::Deflater bug?
+ $env->{'psgix.no-compress'} = 1;
+ }
+ }
+ ($code, $len);
+}
+
+sub response {
+ my ($env, $h, $path, $type) = @_;
+ return unless -f $path && -r _; # just in case it's a FIFO :P
+
+ # TODO: If-Modified-Since and Last-Modified?
+ open my $in, '<', $path or return;
+ my $size = -s $in;
+ my $len = $size;
+ my $code = 200;
+ push @$h, 'Content-Type', $type;
+ if (($env->{HTTP_RANGE} || '') =~ /\bbytes=([0-9]*)-([0-9]*)\z/) {
+ ($code, $len) = prepare_range($env, $in, $h, $1, $2, $size);
+ if ($code == 416) {
+ push @$h, 'Content-Range', "bytes */$size";
+ return [ 416, $h, [] ];
+ }
+ }
+ push @$h, 'Content-Length', $len;
+ my $body = bless {
+ initial_rd => 65536,
+ len => $len,
+ in => $in,
+ path => $path,
+ env => $env,
+ }, __PACKAGE__;
+ [ $code, $h, $body ];
+}
+
+# called by PSGI servers:
+sub getline {
+ my ($self) = @_;
+ my $len = $self->{len};
+ return if $len == 0;
+ my $n = delete($self->{initial_rd}) // 8192;
+ $n = $len if $len < $n;
+ my $r = sysread($self->{in}, my $buf, $n);
+ if (!defined $r) {
+ $self->{env}->{'psgi.errors'}->print(
+ "$self->{path} read error: $!\n");
+ } elsif ($r > 0) { # success!
+ $self->{len} = $len - $r;
+ return $buf;
+ } else {
+ $self->{env}->{'psgi.errors'}->print(
+ "$self->{path} EOF with $len bytes left\n");
+ }
+
+ # drop the client on error
+ if (my $io = $self->{env}->{'psgix.io'}) {
+ $io->close; # this is PublicInbox::DS::close
+ }
+ undef;
+}
+
+sub close {} # noop, just let everything go out-of-scope
+
+1;
# Follow git commit message conventions,
# first line is the Subject/title
my ($title) = ($txt =~ /\A([^\n]*)/s);
- _do_linkify($txt);
+ $ctx->{txt} = \$txt;
$ctx->{-title_html} = ascii_html($title);
-
my $nslash = ($key =~ tr!/!/!);
$ctx->{-upfx} = '../../../' . ('../' x $nslash);
-
- PublicInbox::WwwStream->response($ctx, $code, sub {
- my ($nr, undef) = @_;
- $nr == 1 ? '<pre>'.$txt.'</pre>' : undef
- });
+ PublicInbox::WwwStream->response($ctx, $code, \&_do_linkify);
}
sub _do_linkify {
+ my ($nr, $ctx) = @_;
+ return unless $nr == 1;
my $l = PublicInbox::Linkify->new;
- $l->linkify_1($_[0]);
+ my $txt = delete $ctx->{txt};
+ $l->linkify_1($$txt);
if ($hl) {
- $hl->do_hl_text(\($_[0]));
+ $hl->do_hl_text($txt);
} else {
- $_[0] = ascii_html($_[0]);
+ $$txt = ascii_html($$txt);
}
- $_[0] = $l->linkify_2($_[0]);
+ '<pre>' . $l->linkify_2($$txt) . '</pre>';
}
sub _srch_prefix ($$) {
is(${$gcf->cat_file($f)}, $$raw, 'not broken after failures');
is(${$gcf->cat_file($f)}, $$raw, 'not broken after partial read');
+
+ my $oid = $x[0];
+ my $arg = { 'foo' => 'bar' };
+ my $res = [];
+ my $missing = [];
+ $gcf->cat_async_begin;
+ $gcf->cat_async($oid, sub {
+ my ($bref, $oid_hex, $type, $size, $arg) = @_;
+ $res = [ @_ ];
+ }, $arg);
+ $gcf->cat_async('non-existent', sub {
+ my ($bref, $oid_hex, $type, $size, $arg) = @_;
+ $missing = [ @_ ];
+ }, $arg);
+ $gcf->cat_async_wait;
+ my ($bref, $oid_hex, $type, $size, $arg_res) = @$res;
+ is_deeply([$oid_hex, $type, $size], \@x, 'got expected header');
+ is($arg_res, $arg, 'arg passed to cat_async');
+ is_deeply($raw, $bref, 'blob result matches');
+ is_deeply($missing, [ undef, undef, undef, undef, $arg],
+ 'non-existent blob gives expected result');
}
if (1) {
my $x = PublicInbox::Qspawn->new([qw(true)]);
my $run = 0;
$x->start($limiter, sub {
- my ($rpipe) = @_;
- is(0, sysread($rpipe, my $buf, 1), 'read zero bytes');
- ok(!finish_err($x), 'no error on finish');
+ my ($self) = @_;
+ is(0, sysread($self->{rpipe}, my $buf, 1), 'read zero bytes');
+ ok(!finish_err($self), 'no error on finish');
$run = 1;
});
is($run, 1, 'callback ran alright');
my $x = PublicInbox::Qspawn->new([qw(false)]);
my $run = 0;
$x->start($limiter, sub {
- my ($rpipe) = @_;
- is(0, sysread($rpipe, my $buf, 1), 'read zero bytes from false');
- ok(finish_err($x), 'error on finish');
+ my ($self) = @_;
+ is(0, sysread($self->{rpipe}, my $buf, 1),
+ 'read zero bytes from false');
+ ok(finish_err($self), 'error on finish');
$run = 1;
});
is($run, 1, 'callback ran alright');
my $s = PublicInbox::Qspawn->new($cmd);
my @run;
$s->start($limiter, sub {
- my ($rpipe) = @_;
+ my ($self) = @_;
push @run, 'sleep';
- is(0, sysread($rpipe, my $buf, 1), 'read zero bytes');
+ is(0, sysread($self->{rpipe}, my $buf, 1), 'read zero bytes');
});
my $n = 0;
my @t = map {
my $i = $n++;
my $x = PublicInbox::Qspawn->new([qw(true)]);
$x->start($limiter, sub {
- my ($rpipe) = @_;
+ my ($self) = @_;
push @run, $i;
});
[$x, $i]
close $cfgfh or die;
my $cfg = PublicInbox::Config->new($cfgpath);
my $www = PublicInbox::WWW->new($cfg);
+ my $non_existent = 'ee5e32211bf62ab6531bdf39b84b6920d0b6775a';
my $client = sub {
my ($cb) = @_;
my $res = $cb->(GET("/$name/3435775/s/"));
$res = $cb->(GET("/$name/".('0'x40).'/s/'));
is($res->code, 404, 'failure with null OID');
+ $res = $cb->(GET("/$name/$non_existent/s/"));
+ is($res->code, 404, 'failure with null OID');
+
$res = $cb->(GET("/$name/$v1_0_0_tag/s/"));
is($res->code, 200, 'shows commit');
while (my ($label, $size) = each %bin) {