lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiDedupe.pm
lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiSearch.pm
lib/PublicInbox/LeiStore.pm
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# per-mitem/smsg iterators for search results
+# "ovv" => "Overview viewer"
+package PublicInbox::LeiOverview;
+use strict;
+use v5.10.1;
+use POSIX qw(strftime);
+use File::Spec;
+use PublicInbox::MID qw($MID_EXTRACT);
+use PublicInbox::Address qw(pairs);
+use PublicInbox::Config;
+use PublicInbox::Search qw(get_pct);
+
+# cf. https://en.wikipedia.org/wiki/JSON_streaming
+my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
+
+sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
+
+sub new {
+ my ($class, $lei) = @_;
+ my $opt = $lei->{opt};
+ my $out = $opt->{output} // '-';
+ $out = '/dev/stdout' if $out eq '-';
+
+ my $fmt = $opt->{'format'};
+ $fmt = lc($fmt) if defined $fmt;
+ if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+ my $ofmt = lc $1;
+ $fmt //= $ofmt;
+ return $lei->fail(<<"") if $fmt ne $ofmt;
+--format=$fmt and --output=$ofmt conflict
+
+ }
+ $fmt //= 'json' if $out eq '/dev/stdout';
+ $fmt //= 'maildir'; # TODO
+
+ if (index($out, '://') < 0) { # not a URL, so assume path
+ $out = File::Spec->canonpath($out);
+ } # else URL
+
+ my $self = bless { fmt => $fmt, out => $out }, $class;
+ my $json;
+ if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
+ $json = $self->{json} = ref(PublicInbox::Config->json);
+ }
+ my ($isatty, $seekable);
+ if ($out eq '/dev/stdout') {
+ $isatty = -t $lei->{1};
+ $lei->start_pager if $isatty;
+ $opt->{pretty} //= $isatty;
+ } elsif ($json) {
+ return $lei->fail('JSON formats only output to stdout');
+ }
+ $self;
+}
+
+# called once by parent
+sub ovv_begin {
+ my ($self, $lei) = @_;
+ if ($self->{fmt} eq 'json') {
+ print { $lei->{1} } '[';
+ } # TODO HTML/Atom/...
+}
+
+# called once by parent (via PublicInbox::EOFpipe)
+sub ovv_end {
+ my ($self, $lei) = @_;
+ if ($self->{fmt} eq 'json') {
+ # JSON doesn't allow trailing commas, and preventing
+ # trailing commas is a PITA when parallelizing outputs
+ print { $lei->{1} } "null]\n";
+ } elsif ($self->{fmt} eq 'concatjson') {
+ print { $lei->{1} } "\n";
+ }
+}
+
+sub ovv_atfork_child {
+ my ($self) = @_;
+ # reopen dedupe here
+}
+
+# prepares an smsg for JSON
+sub _unbless_smsg {
+ my ($smsg, $mitem) = @_;
+
+ delete @$smsg{qw(lines bytes num tid)};
+ $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
+ $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
+ $smsg->{relevance} = get_pct($mitem) if $mitem;
+
+ if (my $r = delete $smsg->{references}) {
+ $smsg->{references} = [
+ map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
+ }
+ if (my $m = delete($smsg->{mid})) {
+ $smsg->{'m'} = "<$m>";
+ }
+ for my $f (qw(from to cc)) {
+ my $v = delete $smsg->{$f} or next;
+ $smsg->{substr($f, 0, 1)} = pairs($v);
+ }
+ $smsg->{'s'} = delete $smsg->{subject};
+ # can we be bothered to parse From/To/Cc into arrays?
+ scalar { %$smsg }; # unbless
+}
+
+sub ovv_atexit_child {
+ my ($self, $lei) = @_;
+ my $bref = delete $lei->{ovv_buf} or return;
+ print { $lei->{1} } $$bref;
+}
+
+# JSON module ->pretty output wastes too much vertical white space,
+# this (IMHO) provides better use of screen real-estate while not
+# being excessively compact:
+sub _json_pretty {
+ my ($json, $k, $v) = @_;
+ if (ref $v eq 'ARRAY') {
+ if (@$v) {
+ my $sep = ",\n" . (' ' x (length($k) + 7));
+ if (ref($v->[0])) { # f/t/c
+ $v = '[' . join($sep, map {
+ my $pair = $json->encode($_);
+ $pair =~ s/(null|"),"/$1, "/g;
+ $pair;
+ } @$v) . ']';
+ } else { # references
+ $v = '[' . join($sep, map {
+ substr($json->encode([$_]), 1, -1);
+ } @$v) . ']';
+ }
+ } else {
+ $v = '[]';
+ }
+ }
+ qq{ "$k": }.$v;
+}
+
+sub ovv_each_smsg_cb {
+ my ($self, $lei) = @_;
+ $lei->{ovv_buf} = \(my $buf = '');
+ my $json = $self->{json}->new;
+ if ($json) {
+ $json->utf8->canonical;
+ $json->ascii(1) if $lei->{opt}->{ascii};
+ }
+ if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
+ my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+ sub { # DIY prettiness :P
+ my ($smsg, $mitem) = @_;
+ $smsg = _unbless_smsg($smsg, $mitem);
+ $buf .= "{\n";
+ $buf .= join(",\n", map {
+ my $v = $smsg->{$_};
+ if (ref($v)) {
+ _json_pretty($json, $_, $v);
+ } else {
+ $v = $json->encode([$v]);
+ qq{ "$_": }.substr($v, 1, -1);
+ }
+ } sort keys %$smsg);
+ $buf .= $EOR;
+ if (length($buf) > 65536) {
+ print { $lei->{1} } $buf;
+ $buf = '';
+ }
+ }
+ } elsif ($json) {
+ my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+ sub {
+ my ($smsg, $mitem) = @_;
+ delete @$smsg{qw(tid num)};
+ $buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
+ if (length($buf) > 65536) {
+ print { $lei->{1} } $buf;
+ $buf = '';
+ }
+ }
+ } elsif ($self->{fmt} eq 'oid') {
+ sub {
+ my ($smsg, $mitem) = @_;
+ }
+ } # else { ...
+}
+
+1;
package PublicInbox::LeiQuery;
use strict;
use v5.10.1;
-use PublicInbox::MID qw($MID_EXTRACT);
-use POSIX qw(strftime);
-use PublicInbox::Address qw(pairs);
use PublicInbox::DS qw(dwaitpid);
-sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
-
-# prepares an smsg for JSON
-sub _smsg_unbless ($) {
- my ($smsg) = @_;
-
- delete @$smsg{qw(lines bytes)};
- $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
- $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
-
- if (my $r = delete $smsg->{references}) {
- $smsg->{references} = [
- map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
- }
- if (my $m = delete($smsg->{mid})) {
- $smsg->{'m'} = "<$m>";
- }
- # XXX breaking to/cc, into structured arrays or tables which
- # distinguish "$phrase <$address>" causes pretty printing JSON
- # to take up too much vertical space. I can't get either
- # Cpanel::JSON::XS or JSON::XS or jq(1) only indent when
- # wrapping is necessary, rather than blindly indenting and
- # adding vertical space everywhere.
- for my $f (qw(from to cc)) {
- my $v = delete $smsg->{$f} or next;
- $smsg->{substr($f, 0, 1)} = $v;
- }
- $smsg->{'s'} = delete $smsg->{subject};
- # can we be bothered to parse From/To/Cc into arrays?
- scalar { %$smsg }; # unbless
-}
-
sub _vivify_external { # _externals_each callback
my ($src, $dir) = @_;
if (-f "$dir/ei.lock") {
# src: LeiXSearch || LeiSearch || Inbox
my @srcs;
require PublicInbox::LeiXSearch;
+ require PublicInbox::LeiOverview;
my $lxs = PublicInbox::LeiXSearch->new;
# --external is enabled by default, but allow --no-external
// $lxs->wq_workers($j);
}
unshift(@srcs, $sto->search) if $opt->{'local'};
- my $out = $opt->{output} // '-';
- $out = 'json:/dev/stdout' if $out eq '-';
- my $isatty = -t $self->{1};
# no forking workers after this
- $self->start_pager if $isatty;
- my $json = substr($out, 0, 5) eq 'json:' ?
- ref(PublicInbox::Config->json)->new : undef;
- if ($json) {
- if ($opt->{pretty} //= $isatty) {
- $json->pretty(1)->space_before(0);
- $json->indent_length($opt->{indent} // 2);
- }
- $json->utf8; # avoid Wide character in print warnings
- $json->ascii(1) if $opt->{ascii}; # for "\uXXXX"
- $json->canonical;
- }
-
+ require PublicInbox::LeiOverview;
+ $self->{ovv} = PublicInbox::LeiOverview->new($self);
my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
$mset_opt{qstr} = join(' ', map {;
$mset_opt{relevance} //= -2 if $opt->{thread};
# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
$self->{mset_opt} = \%mset_opt;
- $lxs->do_query($self, \@srcs);
+ $self->{ovv}->ovv_begin($self);
+ pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
+ require PublicInbox::EOFpipe;
+ my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
+ $lxs->do_query($self, $qry_done, \@srcs);
+ $eof->event_step unless $self->{sock};
+}
+
+sub query_done { # PublicInbox::EOFpipe callback
+ my ($self) = @_;
+ $self->{ovv}->ovv_end($self);
}
1;
use strict;
use v5.10.1;
use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::Search qw(get_pct);
use Sys::Syslog qw(syslog);
sub new {
}
my $mo = { %{$lei->{mset_opt}} };
my $mset;
+ my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
do {
$mset = $srch->mset($mo->{qstr}, $mo);
my $ids = $srch->mset_to_artnums($mset, $mo);
my $ctx = { ids => $ids };
my $i = 0;
- my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+ my %n2item = map { ($ids->[$i++], $_) } $mset->items;
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
# next if $dd->is_smsg_dup($smsg); TODO
- if (my $p = delete $n2p{$smsg->{num}}) {
- $smsg->{relevance} = $p;
- }
- print { $self->{1} } Dumper($smsg);
+ my $mitem = delete $n2item{$smsg->{num}};
+ $each_smsg->($smsg, $mitem);
# $self->out($buf .= $ORS);
# $emit_cb->($smsg);
}
@{$ctx->{xids}} = ();
}
} while (_mset_more($mset, $mo));
+ $lei->{ovv}->ovv_atexit_child($lei);
}
sub query_mset { # non-parallel for non-"--thread" users
my $mset;
local %SIG = (%SIG, $lei->atfork_child_wq($self));
$self->attach_external($_) for @$srcs;
+ my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
do {
$mset = $self->mset($mo->{qstr}, $mo);
for my $it ($mset->items) {
my $smsg = smsg_for($self, $it) or next;
# next if $dd->is_smsg_dup($smsg);
- $smsg->{relevance} = get_pct($it);
- use Data::Dumper;
- print { $self->{1} } Dumper($smsg);
+ $each_smsg->($smsg, $it);
# $self->out($buf .= $ORS) if defined $buf;
#$emit_cb->($smsg);
}
} while (_mset_more($mset, $mo));
+ $lei->{ovv}->ovv_atexit_child($lei);
}
sub do_query {
- my ($self, $lei_orig, $srcs) = @_;
+ my ($self, $lei_orig, $qry_done, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+ $io[0] = $qry_done; # don't need stdin
$io[1]->autoflush(1);
$io[2]->autoflush(1);
if ($lei->{opt}->{thread}) {
for my $rmt (@{$self->{remotes} // []}) {
$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
}
+
+ # sent off to children, they will drop remaining references to it
+ close $qry_done;
}
sub ipc_atfork_child {
sub ipc_atfork_prepare {
my ($self) = @_;
- $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]);
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}