]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei import: initial implementation
authorEric Wong <e@80x24.org>
Thu, 4 Feb 2021 09:59:30 +0000 (00:59 -0900)
committerEric Wong <e@80x24.org>
Fri, 5 Feb 2021 00:16:35 +0000 (00:16 +0000)
Only tested with .eml files so far, but Maildir + IMAP
will be supported.

MANIFEST
lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiImport.pm [new file with mode: 0644]
lib/PublicInbox/LeiStore.pm
lib/PublicInbox/LeiXSearch.pm
t/lei.t

index 6922f9b1c8460be5360065f342d59ebe0394de30..a11d410699d7fcd0a5613de7917fa00e46a43fdf 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -179,6 +179,7 @@ lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiImport.pm
 lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiSearch.pm
index 7f5a3f6fc4a6c6d7552438a3f7d426613384f94a..a0e6bfee238466ce83839c4e445c71b1219e55ea 100644 (file)
@@ -101,7 +101,7 @@ sub ipc_worker_loop ($$$) {
 
 # starts a worker if Sereal or Storable is installed
 sub ipc_worker_spawn {
-       my ($self, $ident, $oldset) = @_;
+       my ($self, $ident, $oldset, $fields) = @_;
        return unless $enc; # no Sereal or Storable
        return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
        delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
@@ -123,6 +123,8 @@ sub ipc_worker_spawn {
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                eval {
+                       $fields //= {};
+                       local @$self{keys %$fields} = values(%$fields);
                        my $on_destroy = $self->ipc_atfork_child;
                        local %SIG = %SIG;
                        ipc_worker_loop($self, $r_req, $w_res);
index 24efb49489c39332eb3110025543b41a818121ba..682d1bd1846e1561c489adb259909948c6737473 100644 (file)
@@ -160,9 +160,10 @@ our %CMD = ( # sorted in order of importance/use:
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
        qw(prune) ],
 
-'import' => [ 'URL_OR_PATHNAME|--stdin',
-       'one-shot import/update from URL or filesystem',
-       qw(stdin| offset=i recursive|r exclude=s include=s !flags),
+'import' => [ 'URLS_OR_PATHNAMES...|--stdin',
+       'one-time import/update from URL or filesystem',
+       qw(stdin| offset=i recursive|r exclude=s include|I=s
+       format|f=s flags!),
        ],
 
 'config' => [ '[...]', sub {
@@ -194,8 +195,8 @@ our %CMD = ( # sorted in order of importance/use:
 # $spec => [@ALLOWED_VALUES (default is first), $description],
 # $spec => $description
 # "$SUB_COMMAND TAB $spec" => as above
-my $stdin_formats = [ 'IN|auto|raw|mboxrd|mboxcl2|mboxcl|mboxo',
-               'specify message input format' ];
+my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo',
+                       'specify message input format' ];
 my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ];
 
 my %OPTDESC = (
@@ -240,6 +241,8 @@ my %OPTDESC = (
 'q     jobs=s' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
                'control number of search and writer jobs' ],
 
+'import format|f=s' => $stdin_formats,
+
 'ls-query      format|f=s' => $ls_format,
 'ls-external   format|f=s' => $ls_format,
 
@@ -319,6 +322,20 @@ sub err ($;@) {
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
 
+sub fail_handler ($;$$) {
+       my ($lei, $code, $io) = @_;
+       for my $f (qw(imp lxs l2m)) {
+               my $wq = delete $lei->{$f} or next;
+               $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
+       }
+       close($io) if $io; # needed to avoid warnings on SIGPIPE
+       $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+       fail_handler($_[0], 13, delete $_[0]->{1});
+}
+
 sub fail ($$;$) {
        my ($self, $buf, $exit_code) = @_;
        err($self, $buf) if defined $buf;
@@ -340,7 +357,8 @@ sub out ($;@) {
 sub puts ($;@) { out(shift, map { "$_\n" } @_) }
 
 sub child_error { # passes non-fatal curl exit codes to user
-       my ($self, $child_error) = @_; # child_error is $?
+       my ($self, $child_error, $msg) = @_; # child_error is $?
+       $self->err($msg) if $msg;
        if (my $s = $self->{pkt_op_p} // $self->{sock}) {
                # send to the parent lei-daemon or to lei(1) client
                send($s, "child_error $child_error", MSG_EOR);
@@ -357,9 +375,16 @@ sub note_sigpipe { # triggers sigpipe_handler
 }
 
 sub lei_atfork_child {
-       my ($self) = @_;
+       my ($self, $persist) = @_;
        # we need to explicitly close things which are on stack
-       delete $self->{0};
+       if ($persist) {
+               my @io = delete @$self{0,1,2};
+               unless ($self->{oneshot}) {
+                       close($_) for @io;
+               }
+       } else {
+               delete $self->{0};
+       }
        for (delete @$self{qw(3 sock old_1 au_done)}) {
                close($_) if defined($_);
        }
@@ -374,7 +399,7 @@ sub lei_atfork_child {
        %PATH2CFG = ();
        undef $errors_log;
        $quit = \&CORE::exit;
-       $current_lei = $self; # for SIG{__WARN__}
+       $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
 sub _help ($;$) {
@@ -606,6 +631,11 @@ sub lei_config {
        x_it($self, $?) if $?;
 }
 
+sub lei_import {
+       require PublicInbox::LeiImport;
+       PublicInbox::LeiImport->call(@_);
+}
+
 sub lei_init {
        my ($self, $dir) = @_;
        my $cfg = _lei_cfg($self, 1);
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
new file mode 100644 (file)
index 0000000..4a9af8a
--- /dev/null
@@ -0,0 +1,106 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# front-end for the "lei import" sub-command
+package PublicInbox::LeiImport;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use PublicInbox::MboxReader;
+use PublicInbox::Eml;
+
+sub _import_eml { # MboxReader callback
+       my ($eml, $sto, $set_kw) = @_;
+       $sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ());
+}
+
+sub import_done { # EOF callback for main daemon
+       my ($lei) = @_;
+       my $imp = delete $lei->{imp};
+       $imp->wq_wait_old($lei) if $imp;
+       my $wait = $lei->{sto}->ipc_do('done');
+       $lei->dclose;
+}
+
+sub call { # the main "lei import" method
+       my ($cls, $lei, @argv) = @_;
+       my $sto = $lei->_lei_store(1);
+       $sto->write_prepare($lei);
+       $lei->{opt}->{flags} //= 1;
+       my $fmt = $lei->{opt}->{'format'};
+       my $self = $lei->{imp} = bless {}, $cls;
+       return $lei->fail('--format unspecified') if !$fmt;
+       $self->{0} = $lei->{0} if $lei->{opt}->{stdin};
+       my $ops = {
+               '!' => [ $lei->can('fail_handler'), $lei ],
+               'x_it' => [ $lei->can('x_it'), $lei ],
+               'child_error' => [ $lei->can('child_error'), $lei ],
+               '' => [ \&import_done, $lei ],
+       };
+       ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+       my $j = $lei->{opt}->{jobs} // scalar(@argv) || 1;
+       my $nproc = $self->detect_nproc;
+       $j = $nproc if $j > $nproc;
+       $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
+       my $op = delete $lei->{pkt_op_c};
+       delete $lei->{pkt_op_p};
+       $self->wq_do('import_stdin', []) if $self->{0};
+       for my $x (@argv) {
+               $self->wq_do('import_path_url', [], $x);
+       }
+       $self->wq_close(1);
+       $lei->event_step_init; # wait for shutdowns
+       if ($lei->{oneshot}) {
+               while ($op->{sock}) { $op->event_step }
+       }
+}
+
+sub ipc_atfork_child {
+       my ($self) = @_;
+       $self->{lei}->lei_atfork_child;
+       $self->SUPER::ipc_atfork_child;
+}
+
+sub _import_fh {
+       my ($lei, $fh, $x) = @_;
+       my $set_kw = $lei->{opt}->{flags};
+       my $fmt = $lei->{opt}->{'format'};
+       eval {
+               if ($fmt eq 'eml') {
+                       my $buf = do { local $/; <$fh> } //
+                               return $lei->child_error(1 >> 8, <<"");
+               error reading $x: $!
+
+                       my $eml = PublicInbox::Eml->new(\$buf);
+                       _import_eml($eml, $lei->{sto}, $set_kw);
+               } else { # some mbox
+                       my $cb = PublicInbox::MboxReader->can($fmt);
+                       $cb or return $lei->child_error(1 >> 8, <<"");
+       --format $fmt unsupported for $x
+
+                       $cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw);
+               }
+       };
+       $lei->child_error(1 >> 8, "<stdin>: $@") if $@;
+}
+
+sub import_path_url {
+       my ($self, $x) = @_;
+       my $lei = $self->{lei};
+       # TODO auto-detect?
+       if (-f $x) {
+               open my $fh, '<', $x or return $lei->child_error(1 >> 8, <<"");
+unable to open $x: $!
+
+               _import_fh($lei, $fh, $x);
+       } else {
+               $lei->fail("$x unsupported (TODO)");
+       }
+}
+
+sub import_stdin {
+       my ($self) = @_;
+       _import_fh($self->{lei}, $self->{0}, '<stdin>');
+}
+
+1;
index a7d7d95326a15d624fcedfb682594ef73fef7743..3a215973a4f369ab2776a1f1fab64822c0551d0e 100644 (file)
@@ -17,6 +17,7 @@ use PublicInbox::V2Writable;
 use PublicInbox::ContentHash qw(content_hash content_digest);
 use PublicInbox::MID qw(mids mids_in);
 use PublicInbox::LeiSearch;
+use PublicInbox::MDA;
 use List::Util qw(max);
 
 sub new {
@@ -237,4 +238,21 @@ sub done {
        die $err if $err;
 }
 
+sub ipc_atfork_child {
+       my ($self) = @_;
+       my $lei = delete $self->{lei};
+       $lei->lei_atfork_child(1) if $lei;
+       $self->SUPER::ipc_atfork_child;
+}
+
+sub write_prepare {
+       my ($self, $lei) = @_;
+       $self->ipc_lock_init;
+       # Mail we import into lei are private, so headers filtered out
+       # by -mda for public mail are not appropriate
+       local @PublicInbox::MDA::BAD_HEADERS = ();
+       $self->ipc_worker_spawn('lei_store', $lei->oldset, { lei => $lei });
+       $lei->{sto} = $self;
+}
+
 1;
index daf42098a9bc4d066a98d99e1df31c63e1d406db..f8068362070f3d58b2717543859c1a10a16cd60f 100644 (file)
@@ -392,25 +392,11 @@ sub query_prepare { # called by wq_do
        pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
-sub fail_handler ($;$$) {
-       my ($lei, $code, $io) = @_;
-       for my $f (qw(lxs l2m)) {
-               my $wq = delete $lei->{$f} or next;
-               $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
-       }
-       close($io) if $io; # needed to avoid warnings on SIGPIPE
-       $lei->x_it($code // (1 >> 8));
-}
-
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
-       fail_handler($_[0], 13, delete $_[0]->{1});
-}
-
 sub do_query {
        my ($self, $lei) = @_;
        my $ops = {
-               '|' => [ \&sigpipe_handler, $lei ],
-               '!' => [ \&fail_handler, $lei ],
+               '|' => [ $lei->can('sigpipe_handler'), $lei ],
+               '!' => [ $lei->can('fail_handler'), $lei ],
                '.' => [ \&do_post_augment, $lei ],
                '' => [ \&query_done, $lei ],
                'mset_progress' => [ \&mset_progress, $lei ],
diff --git a/t/lei.t b/t/lei.t
index a08a6d0d6ecc53817bbbe264cef3bc3854e96be4..eb824a307f6df70558203a3d4c8c2c44656ab26e 100644 (file)
--- a/t/lei.t
+++ b/t/lei.t
@@ -389,6 +389,20 @@ SKIP: {
 }; # /SKIP
 };
 
+my $test_import = sub {
+       $cleanup->();
+       ok($lei->(qw(q s:boolean)), 'search miss before import');
+       unlike($out, qr/boolean/i, 'no results, yet');
+       open my $fh, '<', 't/data/0001.patch' or BAIL_OUT $!;
+       ok($lei->([qw(import -f eml -)], undef, { %$opt, 0 => $fh }),
+               'import single file from stdin');
+       close $fh;
+       ok($lei->(qw(q s:boolean)), 'search hit after import');
+       ok($lei->(qw(import -f eml), 't/data/message_embed.eml'),
+               'import single file by path');
+       $cleanup->();
+};
+
 my $test_lei_common = sub {
        $test_help->();
        $test_config->();
@@ -396,6 +410,7 @@ my $test_lei_common = sub {
        $test_external->();
        $test_completion->();
        $test_fail->();
+       $test_import->();
 };
 
 if ($ENV{TEST_LEI_ONESHOT}) {