lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiDedupe.pm
lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiSearch.pm
# starts a worker if Sereal or Storable is installed
sub ipc_worker_spawn {
- my ($self, $ident, $oldset) = @_;
+ my ($self, $ident, $oldset, $fields) = @_;
return unless $enc; # no Sereal or Storable
return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
+ $fields //= {};
+ local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
ipc_worker_loop($self, $r_req, $w_res);
'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
qw(prune) ],
-'import' => [ 'URL_OR_PATHNAME|--stdin',
- 'one-shot import/update from URL or filesystem',
- qw(stdin| offset=i recursive|r exclude=s include=s !flags),
+'import' => [ 'URLS_OR_PATHNAMES...|--stdin',
+ 'one-time import/update from URL or filesystem',
+ qw(stdin| offset=i recursive|r exclude=s include|I=s
+ format|f=s flags!),
],
'config' => [ '[...]', sub {
# $spec => [@ALLOWED_VALUES (default is first), $description],
# $spec => $description
# "$SUB_COMMAND TAB $spec" => as above
-my $stdin_formats = [ 'IN|auto|raw|mboxrd|mboxcl2|mboxcl|mboxo',
- 'specify message input format' ];
+my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo',
+ 'specify message input format' ];
my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ];
my %OPTDESC = (
'q jobs=s' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
'control number of search and writer jobs' ],
+'import format|f=s' => $stdin_formats,
+
'ls-query format|f=s' => $ls_format,
'ls-external format|f=s' => $ls_format,
sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
+sub fail_handler ($;$$) {
+ my ($lei, $code, $io) = @_;
+ for my $f (qw(imp lxs l2m)) {
+ my $wq = delete $lei->{$f} or next;
+ $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
+ }
+ close($io) if $io; # needed to avoid warnings on SIGPIPE
+ $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+ fail_handler($_[0], 13, delete $_[0]->{1});
+}
+
sub fail ($$;$) {
my ($self, $buf, $exit_code) = @_;
err($self, $buf) if defined $buf;
sub puts ($;@) { out(shift, map { "$_\n" } @_) }
sub child_error { # passes non-fatal curl exit codes to user
- my ($self, $child_error) = @_; # child_error is $?
+ my ($self, $child_error, $msg) = @_; # child_error is $?
+ $self->err($msg) if $msg;
if (my $s = $self->{pkt_op_p} // $self->{sock}) {
# send to the parent lei-daemon or to lei(1) client
send($s, "child_error $child_error", MSG_EOR);
}
sub lei_atfork_child {
- my ($self) = @_;
+ my ($self, $persist) = @_;
# we need to explicitly close things which are on stack
- delete $self->{0};
+ if ($persist) {
+ my @io = delete @$self{0,1,2};
+ unless ($self->{oneshot}) {
+ close($_) for @io;
+ }
+ } else {
+ delete $self->{0};
+ }
for (delete @$self{qw(3 sock old_1 au_done)}) {
close($_) if defined($_);
}
%PATH2CFG = ();
undef $errors_log;
$quit = \&CORE::exit;
- $current_lei = $self; # for SIG{__WARN__}
+ $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
}
sub _help ($;$) {
x_it($self, $?) if $?;
}
+sub lei_import {
+ require PublicInbox::LeiImport;
+ PublicInbox::LeiImport->call(@_);
+}
+
sub lei_init {
my ($self, $dir) = @_;
my $cfg = _lei_cfg($self, 1);
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# front-end for the "lei import" sub-command
+package PublicInbox::LeiImport;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use PublicInbox::MboxReader;
+use PublicInbox::Eml;
+
+sub _import_eml { # MboxReader callback
+ my ($eml, $sto, $set_kw) = @_;
+ $sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ());
+}
+
+sub import_done { # EOF callback for main daemon
+ my ($lei) = @_;
+ my $imp = delete $lei->{imp};
+ $imp->wq_wait_old($lei) if $imp;
+ my $wait = $lei->{sto}->ipc_do('done');
+ $lei->dclose;
+}
+
+sub call { # the main "lei import" method
+ my ($cls, $lei, @argv) = @_;
+ my $sto = $lei->_lei_store(1);
+ $sto->write_prepare($lei);
+ $lei->{opt}->{flags} //= 1;
+ my $fmt = $lei->{opt}->{'format'};
+ my $self = $lei->{imp} = bless {}, $cls;
+ return $lei->fail('--format unspecified') if !$fmt;
+ $self->{0} = $lei->{0} if $lei->{opt}->{stdin};
+ my $ops = {
+ '!' => [ $lei->can('fail_handler'), $lei ],
+ 'x_it' => [ $lei->can('x_it'), $lei ],
+ 'child_error' => [ $lei->can('child_error'), $lei ],
+ '' => [ \&import_done, $lei ],
+ };
+ ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ my $j = $lei->{opt}->{jobs} // scalar(@argv) || 1;
+ my $nproc = $self->detect_nproc;
+ $j = $nproc if $j > $nproc;
+ $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
+ my $op = delete $lei->{pkt_op_c};
+ delete $lei->{pkt_op_p};
+ $self->wq_do('import_stdin', []) if $self->{0};
+ for my $x (@argv) {
+ $self->wq_do('import_path_url', [], $x);
+ }
+ $self->wq_close(1);
+ $lei->event_step_init; # wait for shutdowns
+ if ($lei->{oneshot}) {
+ while ($op->{sock}) { $op->event_step }
+ }
+}
+
+sub ipc_atfork_child {
+ my ($self) = @_;
+ $self->{lei}->lei_atfork_child;
+ $self->SUPER::ipc_atfork_child;
+}
+
+sub _import_fh {
+ my ($lei, $fh, $x) = @_;
+ my $set_kw = $lei->{opt}->{flags};
+ my $fmt = $lei->{opt}->{'format'};
+ eval {
+ if ($fmt eq 'eml') {
+ my $buf = do { local $/; <$fh> } //
+ return $lei->child_error(1 >> 8, <<"");
+ error reading $x: $!
+
+ my $eml = PublicInbox::Eml->new(\$buf);
+ _import_eml($eml, $lei->{sto}, $set_kw);
+ } else { # some mbox
+ my $cb = PublicInbox::MboxReader->can($fmt);
+ $cb or return $lei->child_error(1 >> 8, <<"");
+ --format $fmt unsupported for $x
+
+ $cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw);
+ }
+ };
+ $lei->child_error(1 >> 8, "<stdin>: $@") if $@;
+}
+
+sub import_path_url {
+ my ($self, $x) = @_;
+ my $lei = $self->{lei};
+ # TODO auto-detect?
+ if (-f $x) {
+ open my $fh, '<', $x or return $lei->child_error(1 >> 8, <<"");
+unable to open $x: $!
+
+ _import_fh($lei, $fh, $x);
+ } else {
+ $lei->fail("$x unsupported (TODO)");
+ }
+}
+
+sub import_stdin {
+ my ($self) = @_;
+ _import_fh($self->{lei}, $self->{0}, '<stdin>');
+}
+
+1;
use PublicInbox::ContentHash qw(content_hash content_digest);
use PublicInbox::MID qw(mids mids_in);
use PublicInbox::LeiSearch;
+use PublicInbox::MDA;
use List::Util qw(max);
sub new {
die $err if $err;
}
+sub ipc_atfork_child {
+ my ($self) = @_;
+ my $lei = delete $self->{lei};
+ $lei->lei_atfork_child(1) if $lei;
+ $self->SUPER::ipc_atfork_child;
+}
+
+sub write_prepare {
+ my ($self, $lei) = @_;
+ $self->ipc_lock_init;
+ # Mail we import into lei are private, so headers filtered out
+ # by -mda for public mail are not appropriate
+ local @PublicInbox::MDA::BAD_HEADERS = ();
+ $self->ipc_worker_spawn('lei_store', $lei->oldset, { lei => $lei });
+ $lei->{sto} = $self;
+}
+
1;
pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
}
-sub fail_handler ($;$$) {
- my ($lei, $code, $io) = @_;
- for my $f (qw(lxs l2m)) {
- my $wq = delete $lei->{$f} or next;
- $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
- }
- close($io) if $io; # needed to avoid warnings on SIGPIPE
- $lei->x_it($code // (1 >> 8));
-}
-
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
- fail_handler($_[0], 13, delete $_[0]->{1});
-}
-
sub do_query {
my ($self, $lei) = @_;
my $ops = {
- '|' => [ \&sigpipe_handler, $lei ],
- '!' => [ \&fail_handler, $lei ],
+ '|' => [ $lei->can('sigpipe_handler'), $lei ],
+ '!' => [ $lei->can('fail_handler'), $lei ],
'.' => [ \&do_post_augment, $lei ],
'' => [ \&query_done, $lei ],
'mset_progress' => [ \&mset_progress, $lei ],
}; # /SKIP
};
+my $test_import = sub {
+ $cleanup->();
+ ok($lei->(qw(q s:boolean)), 'search miss before import');
+ unlike($out, qr/boolean/i, 'no results, yet');
+ open my $fh, '<', 't/data/0001.patch' or BAIL_OUT $!;
+ ok($lei->([qw(import -f eml -)], undef, { %$opt, 0 => $fh }),
+ 'import single file from stdin');
+ close $fh;
+ ok($lei->(qw(q s:boolean)), 'search hit after import');
+ ok($lei->(qw(import -f eml), 't/data/message_embed.eml'),
+ 'import single file by path');
+ $cleanup->();
+};
+
my $test_lei_common = sub {
$test_help->();
$test_config->();
$test_external->();
$test_completion->();
$test_fail->();
+ $test_import->();
};
if ($ENV{TEST_LEI_ONESHOT}) {