lib/PublicInbox/ExtMsg.pm
lib/PublicInbox/ExtSearch.pm
lib/PublicInbox/ExtSearchIdx.pm
+lib/PublicInbox/FakeImport.pm
lib/PublicInbox/FakeInotify.pm
lib/PublicInbox/Feed.pm
lib/PublicInbox/Filter/Base.pm
lib/PublicInbox/LeiForgetSearch.pm
lib/PublicInbox/LeiHelp.pm
lib/PublicInbox/LeiImport.pm
+lib/PublicInbox/LeiIndex.pm
lib/PublicInbox/LeiInit.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiInspect.pm
t/lei-import-maildir.t
t/lei-import-nntp.t
t/lei-import.t
+t/lei-index.t
t/lei-lcat.t
t/lei-mirror.t
t/lei-p2q.t
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# pretend to do PublicInbox::Import::add for "lei index"
+package PublicInbox::FakeImport;
+use strict;
+use PublicInbox::ContentHash qw(git_sha);
+
+sub new { bless { bytes_added => 0 }, __PACKAGE__ }
+
+sub add {
+ my ($self, $eml, $check_cb, $smsg) = @_;
+ $smsg->populate($eml);
+ my $raw = $eml->as_string;
+ $smsg->{blob} = git_sha(1, \$raw)->hexdigest;
+ $smsg->set_bytes($raw, length($raw));
+ if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore
+ $oidx->vivify_xvmd($smsg) or return;
+ }
+ 1;
+}
+
+1;
$smsg->{blob} = $self->get_mark(":$blob");
$smsg->set_bytes($raw_email, $n);
if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore
- my @docids = $oidx->blob_exists($smsg->{blob});
- my @vivify_xvmd;
- for my $id (@docids) {
- if (my $cur = $oidx->get_art($id)) {
- # already imported if bytes > 0
- return if $cur->{bytes} > 0;
- push @vivify_xvmd, $id;
- } else {
- warn "W: $smsg->{blob} ",
- "#$id gone (bug?)\n";
- }
- }
- $smsg->{-vivify_xvmd} = \@vivify_xvmd;
+ my $eidx_git = delete $smsg->{-eidx_git};
+
+ # we need this sharedkv to dedupe blobs added in the
+ # same fast-import transaction
+ my $u = $self->{uniq_skv} //= do {
+ require PublicInbox::SharedKV;
+ my $x = PublicInbox::SharedKV->new;
+ $x->dbh;
+ $x;
+ };
+ return if !$u->set_maybe(pack('H*', $smsg->{blob}), 1);
+ return if (!$oidx->vivify_xvmd($smsg) &&
+ $eidx_git->check($smsg->{blob}));
}
}
my $ref = $self->{ref};
$lei->out($part->body);
}
+sub extract_attach ($$$) {
+ my ($lei, $blob, $bref) = @_;
+ my $eml = PublicInbox::Eml->new($bref);
+ $eml->each_part(\&cat_attach_i, $lei, 1);
+ my $idx = delete $lei->{-attach_idx};
+ defined($idx) and return $lei->fail(<<EOM);
+E: attachment $idx not found in $blob
+EOM
+}
+
sub lei_blob {
my ($lei, $blob) = @_;
$lei->start_pager if -t $lei->{1};
}
my $rdr = {};
if ($opt->{mail}) {
- $rdr->{2} = $lei->{2};
+ open $rdr->{2}, '+>', undef or die "open: $!";
} else {
open $rdr->{2}, '>', '/dev/null' or die "open: $!";
}
if (defined $lei->{-attach_idx}) {
my $fh = popen_rd($cmd, $lei->{env}, $rdr);
require PublicInbox::Eml;
- my $str = do { local $/; <$fh> };
- if (close $fh) {
- my $eml = PublicInbox::Eml->new(\$str);
- $eml->each_part(\&cat_attach_i, $lei, 1);
- my $idx = delete $lei->{-attach_idx};
- defined($idx) and return $lei->fail(<<EOM);
-E: attachment $idx not found in $blob
-EOM
- }
+ my $buf = do { local $/; <$fh> };
+ return extract_attach($lei, $blob, \$buf) if close($fh);
} else {
$rdr->{1} = $lei->{1};
waitpid(spawn($cmd, $lei->{env}, $rdr), 0);
}
- return if $? == 0;
- return $lei->child_error($?) if $opt->{mail};
+ my $ce = $?;
+ return if $ce == 0;
+ my $sto = $lei->_lei_store;
+ my $lms = $sto ? $sto->search->lms : undef;
+ if (my $bref = $lms ? $lms->local_blob($blob, 1) : undef) {
+ defined($lei->{-attach_idx}) and
+ return extract_attach($lei, $blob, $bref);
+ return $lei->out($$bref);
+ } elsif ($opt->{mail}) {
+ my $eh = $rdr->{2};
+ seek($eh, 0, 0);
+ return $lei->child_error($ce, do { local $/; <$eh> });
+ } # else: fall through to solver below
}
# maybe it's a non-email (code) blob from a coderepo
warn "E: $f was not from a Maildir?\n";
}
}
- input_eml_cb($self, $eml, $vmd);
+ $self->input_eml_cb($eml, $vmd);
}
sub input_net_cb { # imap_each / nntp_each
my ($url, $uid, $kw, $eml, $self) = @_;
my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
$vmd->{sync_info} = [ $url, $uid ] if $self->{-mail_sync};
- input_eml_cb($self, $eml, $vmd);
+ $self->input_eml_cb($eml, $vmd);
}
-sub lei_import { # the main "lei import" method
- my ($lei, @inputs) = @_;
+sub do_import_index ($$@) {
+ my ($self, $lei, @inputs) = @_;
my $sto = $lei->_lei_store(1);
$sto->write_prepare($lei);
- my $self = bless {}, __PACKAGE__;
$self->{-import_kw} = $lei->{opt}->{kw} // 1;
my $vmd_mod = $self->vmd_mod_extract(\@inputs);
return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
$op_c->op_wait_event($ops);
}
+sub lei_import { # the main "lei import" method
+ my ($lei, @inputs) = @_;
+ my $self = bless {}, __PACKAGE__;
+ do_import_index($self, $lei, @inputs);
+}
+
sub _complete_import {
my ($lei, @argv) = @_;
my $sto = $lei->_lei_store or return;
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# front-end for the "lei index" sub-command, this is similar to
+# "lei import" but doesn't put a git blob into ~/.local/share/lei/store
+package PublicInbox::LeiIndex;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::LeiImport;
+
+# /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
+sub input_eml_cb { # used by input_maildir_cb and input_net_cb
+ my ($self, $eml, $vmd) = @_;
+ my $xoids = $self->{lei}->{ale}->xoids_for($eml);
+ if (my $all_vmd = $self->{all_vmd}) {
+ @$vmd{keys %$all_vmd} = values %$all_vmd;
+ }
+ $self->{lei}->{sto}->ipc_do('index_eml_only', $eml, $vmd, $xoids);
+}
+
+sub input_fh { # overrides PublicInbox::LeiInput::input_fh
+ my ($self, $ifmt, $fh, $input, @args) = @_;
+ $self->{lei}->child_error(1<<8, <<EOM);
+$input ($ifmt) not yet supported, try `lei import'
+EOM
+}
+
+sub lei_index {
+ my ($lei, @argv) = @_;
+ $lei->{opt}->{'mail-sync'} = 1;
+ my $self = bless {}, __PACKAGE__;
+ PublicInbox::LeiImport::do_import_index($self, $lei, @argv);
+}
+
+no warnings 'once';
+no strict 'refs';
+for my $m (qw(input_maildir_cb input_net_cb)) {
+ *$m = PublicInbox::LeiImport->can($m);
+}
+
+*_complete_import = \&PublicInbox::LeiImport::_complete_import;
+*ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child;
+*net_merge_all_done = \&PublicInbox::LeiInput::input_only_net_merge_all_done;
+
+# the following works even when LeiAuth is lazy-loaded
+*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
+1;
# Copyright (C) 2021 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# parent class for LeiImport, LeiConvert
+# parent class for LeiImport, LeiConvert, LeiIndex
package PublicInbox::LeiInput;
use strict;
use v5.10.1;
my ($fh, $pid) = popen_rd($cmd, undef, $rdr);
grep(/\A--compressed\z/, @$curl) or
$fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1);
- eval {
- PublicInbox::MboxReader->mboxrd($fh,
- $self->can('input_mbox_cb'),
- $self, @args);
- };
+ eval { $self->input_fh('mboxrd', $fh, $url, @args) };
my $err = $@;
waitpid($pid, 0);
$? || $err and
require PublicInbox::NetReader;
$net //= PublicInbox::NetReader->new;
$net->add_url($input);
- if ($sync) {
- if ($input =~ m!\Aimaps?://!) {
- push @{$sync->{ok}}, $input;
- } else {
- push @{$sync->{no}}, $input;
- }
- }
- } elsif ($input_path =~ m!\Ahttps?://!i) {
+ push @{$sync->{ok}}, $input if $sync;
+ } elsif ($input_path =~ m!\Ahttps?://!i) { # mboxrd.gz
# TODO: how would we detect r/w JMAP?
push @{$sync->{no}}, $input if $sync;
prepare_http_input($self, $lei, $input_path) or return;
--in-format=$in_fmt and `$ifmt:' conflict
}
- if ($sync) {
- if ($ifmt =~ /\A(?:maildir|mh)\z/i) {
- push @{$sync->{ok}}, $input;
- } else {
- push @{$sync->{no}}, $input;
- }
+ if ($ifmt =~ /\A(?:maildir|mh)\z/i) {
+ push @{$sync->{ok}}, $input if $sync;
+ } else {
+ push @{$sync->{no}}, $input if $sync;
}
my $devfd = $lei->path_to_fd($input_path) // return;
if ($devfd >= 0 || (-f $input_path || -p _)) {
} else {
return $lei->fail("Unable to handle $input");
}
- } elsif ($input =~ /\.(eml|patch)\z/i && -f $input) {
+ } elsif ($input =~ /\.(?:eml|patch)\z/i && -f $input) {
lc($in_fmt//'eml') eq 'eml' or return $lei->fail(<<"");
$input is `eml', not --in-format=$in_fmt
use strict;
use v5.10.1;
use DBI;
+use PublicInbox::ContentHash qw(git_sha);
sub dbh_new {
my ($self, $rw) = @_;
map { $_->[0] } @{$dbh->selectall_arrayref($sql, undef, @pfx)};
}
+sub local_blob {
+ my ($self, $oidhex, $vrfy) = @_;
+ my $dbh = $self->{dbh} //= dbh_new($self);
+ my $b2n = $dbh->prepare(<<'');
+SELECT f.loc,b.name FROM blob2name b
+LEFT JOIN folders f ON b.fid = f.fid
+WHERE b.oidbin = ?
+
+ $b2n->execute(pack('H*', $oidhex));
+ while (my ($d, $n) = $b2n->fetchrow_array) {
+ substr($d, 0, length('maildir:')) = '';
+ my $f = "$d/" . ($n =~ /:2,[a-zA-Z]*\z/ ? "cur/$n" : "new/$n");
+ open my $fh, '<', $f or next;
+ if (-s $fh) {
+ local $/;
+ my $raw = <$fh>;
+ if ($vrfy && git_sha(1, \$raw)->hexdigest ne $oidhex) {
+ warn "$f changed $oidhex\n";
+ next;
+ }
+ return \$raw;
+ }
+ }
+ undef;
+}
+
1;
sub add_eml {
my ($self, $eml, $vmd, $xoids) = @_;
- my $im = $self->importer; # may create new epoch
+ my $im = $self->{-fake_im} // $self->importer; # may create new epoch
my ($eidx, $tl) = eidx_init($self);
my $oidx = $eidx->{oidx}; # PublicInbox::Import::add checks this
my $smsg = bless { -oidx => $oidx }, 'PublicInbox::Smsg';
+ $smsg->{-eidx_git} = $eidx->git if !$self->{-fake_im};
my $im_mark = $im->add($eml, undef, $smsg);
if ($vmd && $vmd->{sync_info}) {
set_sync_info($self, $smsg->{blob}, @{$vmd->{sync_info}});
set_eml_vmd($self, $eml, $vmd);
}
+sub index_eml_only {
+ my ($self, $eml, $vmd, $xoids) = @_;
+ require PublicInbox::FakeImport;
+ local $self->{-fake_im} = PublicInbox::FakeImport->new;
+ set_eml($self, $eml, $vmd, $xoids);
+}
+
sub _external_only ($$$) {
my ($self, $xoids, $eml) = @_;
my $eidx = $self->{priv_eidx};
sub git_to_mail { # git->cat_async callback
my ($bref, $oid, $type, $size, $arg) = @_;
+ my ($write_cb, $smsg) = @$arg;
+ if ($type eq 'missing' && $smsg->{-lms_ro}) {
+ if ($bref = $smsg->{-lms_ro}->local_blob($oid, 1)) {
+ $type = 'blob';
+ $size = length($$bref);
+ }
+ }
return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob';
return warn("E: $oid is empty\n") unless $size;
- my ($write_cb, $smsg) = @$arg;
die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid;
$write_cb->($bref, $smsg);
}
my ($self) = @_;
my $lei = $self->{lei};
$lei->_lei_atfork_child;
+ $self->{-lms_ro} = $lei->{lse}->lms if $lei->{lse};
$lei->{auth}->do_auth_atfork($self) if $lei->{auth};
$SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
$self->SUPER::ipc_atfork_child;
sub write_mail { # via ->wq_io_do
my ($self, $smsg, $eml) = @_;
return $self->{wcb}->(undef, $smsg, $eml) if $eml;
+ $smsg->{-lms_ro} = $self->{-lms_ro};
$self->{lei}->{ale}->git->cat_async($smsg->{blob}, \&git_to_mail,
[$self->{wcb}, $smsg]);
}
}
+# returns true if we're vivifying a message for lei/store that was
+# previously external-metadata only
+sub vivify_xvmd {
+ my ($self, $smsg) = @_;
+ my @docids = $self->blob_exists($smsg->{blob});
+ my @vivify_xvmd;
+ for my $id (@docids) {
+ if (my $cur = $self->get_art($id)) {
+ # already indexed if bytes > 0
+ return if $cur->{bytes} > 0;
+ push @vivify_xvmd, $id;
+ } else {
+ warn "W: $smsg->{blob} #$id gone (bug?)\n";
+ }
+ }
+ $smsg->{-vivify_xvmd} = \@vivify_xvmd;
+}
+
1;
--- /dev/null
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict; use v5.10.1; use PublicInbox::TestCommon;
+use File::Spec;
+require_mods(qw(lei -nntpd));
+my ($ro_home, $cfg_path) = setup_public_inboxes;
+my ($tmpdir, $for_destroy) = tmpdir;
+my $env = { PI_CONFIG => $cfg_path };
+
+my $sock = tcp_server;
+my $cmd = [ '-nntpd', '-W0', "--stdout=$tmpdir/n1", "--stderr=$tmpdir/n2" ];
+my $nntpd = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-nntpd $?");
+my $nntp_host_port = tcp_host_port($sock);
+
+$sock = tcp_server;
+$cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/i1", "--stderr=$tmpdir/i2" ];
+my $imapd = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd $?");
+my $imap_host_port = tcp_host_port($sock);
+undef $sock;
+for ('', qw(cur new)) {
+ mkdir "$tmpdir/md/$_" or xbail "mkdir: $!";
+}
+symlink(File::Spec->rel2abs('t/plack-qp.eml'), "$tmpdir/md/cur/x:2,");
+my $expect = do {
+ open my $fh, '<', 't/plack-qp.eml' or xbail $!;
+ local $/;
+ <$fh>;
+};
+test_lei({ tmpdir => $tmpdir }, sub {
+ my $store_path = "$ENV{HOME}/.local/share/lei/store/";
+
+ lei_ok('index', "$tmpdir/md");
+ lei_ok(qw(q mid:qp@example.com));
+ my $res_a = json_utf8->decode($lei_out);
+ my $blob = $res_a->[0]->{'blob'};
+ like($blob, qr/\A[0-9a-f]{40,}\z/, 'got blob from qp@example');
+ lei_ok('blob', $blob);
+ is($lei_out, $expect, 'got expected blob via Maildir');
+ lei_ok(qw(q mid:qp@example.com -f text));
+ like($lei_out, qr/^hi = bye/sm, 'lei2mail fallback');
+
+ my $all_obj = ['git', "--git-dir=$store_path/ALL.git",
+ qw(cat-file --batch-check --batch-all-objects)];
+ is_deeply([xqx($all_obj)], [], 'no git objects');
+ lei_ok('import', 't/plack-qp.eml');
+ ok(grep(/\A$blob blob /, my @objs = xqx($all_obj)),
+ 'imported blob');
+ lei_ok(qw(q z:0.. --dedupe=none));
+ my $res_b = json_utf8->decode($lei_out);
+ is_deeply($res_b, $res_a, 'no extra DB entries');
+
+ lei_ok('index', "nntp://$nntp_host_port/t.v2");
+ lei_ok('index', "imap://$imap_host_port/t.v2.0");
+ is_deeply([xqx($all_obj)], \@objs, 'no new objects from NNTP+IMAP');
+});
+
+done_testing;