]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/LeiImport.pm
lei_input: common filehandle reader for eml + mbox
[public-inbox.git] / lib / PublicInbox / LeiImport.pm
1 # Copyright (C) 2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # front-end for the "lei import" sub-command
5 package PublicInbox::LeiImport;
6 use strict;
7 use v5.10.1;
8 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
9 use PublicInbox::Eml;
10 use PublicInbox::PktOp qw(pkt_do);
11
12 sub eml_cb { # used by PublicInbox::LeiInput::input_fh
13         my ($self, $eml) = @_;
14         my $vmd;
15         if ($self->{-import_kw}) { # FIXME
16                 my $kw = PublicInbox::MboxReader::mbox_keywords($eml);
17                 $vmd = { kw => $kw } if scalar(@$kw);
18         }
19         my $xoids = $self->{lei}->{ale}->xoids_for($eml);
20         $self->{lei}->{sto}->ipc_do('set_eml', $eml, $vmd, $xoids);
21 }
22
23 sub mbox_cb { # MboxReader callback used by PublicInbox::LeiInput::input_fh
24         my ($eml, $self) = @_;
25         eml_cb($self, $eml);
26 }
27
28 sub import_done_wait { # dwaitpid callback
29         my ($arg, $pid) = @_;
30         my ($imp, $lei) = @$arg;
31         $lei->child_error($?, 'non-fatal errors during import') if $?;
32         my $sto = delete $lei->{sto};
33         my $wait = $sto->ipc_do('done') if $sto; # PublicInbox::LeiStore::done
34         $lei->dclose;
35 }
36
37 sub import_done { # EOF callback for main daemon
38         my ($lei) = @_;
39         my $imp = delete $lei->{imp} or return;
40         $imp->wq_wait_old(\&import_done_wait, $lei);
41 }
42
43 sub net_merge_complete { # callback used by LeiAuth
44         my ($self) = @_;
45         for my $input (@{$self->{inputs}}) {
46                 $self->wq_io_do('import_path_url', [], $input);
47         }
48         $self->wq_close(1);
49 }
50
51 sub import_start {
52         my ($lei) = @_;
53         my $self = $lei->{imp};
54         $lei->ale; # initialize for workers to read
55         my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
56         if (my $net = $lei->{net}) {
57                 # $j = $net->net_concurrency($j); TODO
58         } else {
59                 my $nproc = $self->detect_nproc;
60                 $j = $nproc if $j > $nproc;
61         }
62         my $ops = { '' => [ \&import_done, $lei ] };
63         $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
64         $self->{-wq_nr_workers} = $j // 1; # locked
65         my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
66         $self->wq_io_do('import_stdin', []) if $self->{0};
67         net_merge_complete($self) unless $lei->{auth};
68         while ($op && $op->{sock}) { $op->event_step }
69 }
70
71 sub lei_import { # the main "lei import" method
72         my ($lei, @inputs) = @_;
73         my $sto = $lei->_lei_store(1);
74         $sto->write_prepare($lei);
75         my $self = $lei->{imp} = bless {}, __PACKAGE__;
76         $self->{-import_kw} = $lei->{opt}->{kw} // 1;
77         $self->prepare_inputs($lei, \@inputs) or return;
78         import_start($lei);
79 }
80
81 sub ipc_atfork_child {
82         my ($self) = @_;
83         my $lei = $self->{lei};
84         delete $lei->{imp}; # drop circular ref
85         $lei->lei_atfork_child;
86         $self->SUPER::ipc_atfork_child;
87         $lei->{auth}->do_auth_atfork($self) if $lei->{auth};
88         undef;
89 }
90
91 sub _import_maildir { # maildir_each_eml cb
92         my ($f, $kw, $eml, $sto, $set_kw) = @_;
93         $sto->ipc_do('set_eml', $eml, $set_kw ? { kw => $kw }: ());
94 }
95
96 sub _import_net { # imap_each, nntp_each cb
97         my ($url, $uid, $kw, $eml, $sto, $set_kw) = @_;
98         $sto->ipc_do('set_eml', $eml, $set_kw ? { kw => $kw } : ());
99 }
100
101 sub import_path_url {
102         my ($self, $input) = @_;
103         my $lei = $self->{lei};
104         my $ifmt = lc($lei->{opt}->{'in-format'} // '');
105         # TODO auto-detect?
106         if ($input =~ m!\Aimaps?://!i) {
107                 $lei->{net}->imap_each($input, \&_import_net, $lei->{sto},
108                                         $self->{-import_kw});
109                 return;
110         } elsif ($input =~ m!\A(?:nntps?|s?news)://!i) {
111                 $lei->{net}->nntp_each($input, \&_import_net, $lei->{sto}, 0);
112                 return;
113         } elsif ($input =~ s!\A([a-z0-9]+):!!i) {
114                 $ifmt = lc $1;
115         }
116         if (-f $input) {
117                 my $m = $lei->{opt}->{'lock'} // ($ifmt eq 'eml' ? ['none'] :
118                                 PublicInbox::MboxLock->defaults);
119                 my $mbl = PublicInbox::MboxLock->acq($input, 0, $m);
120                 $self->input_fh($ifmt, $mbl->{fh}, $input);
121         } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
122                 return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
123 $input appears to a be a maildir, not $ifmt
124 EOM
125                 PublicInbox::MdirReader::maildir_each_eml($input,
126                                         \&_import_maildir,
127                                         $lei->{sto}, $self->{-import_kw});
128         } else {
129                 $lei->fail("$input unsupported (TODO)");
130         }
131 }
132
133 sub import_stdin {
134         my ($self) = @_;
135         my $lei = $self->{lei};
136         my $in = delete $self->{0};
137         $self->input_fh($lei->{opt}->{'in-format'}, $in, '<stdin>');
138 }
139
140 no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
141 *net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
142 1;