]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/LeiImport.pm
68cab12c819323787f4341cfc02de9d9d8538b46
[public-inbox.git] / lib / PublicInbox / LeiImport.pm
1 # Copyright (C) 2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # front-end for the "lei import" sub-command
5 package PublicInbox::LeiImport;
6 use strict;
7 use v5.10.1;
8 use parent qw(PublicInbox::IPC);
9 use PublicInbox::Eml;
10 use PublicInbox::InboxWritable qw(eml_from_path);
11
12 sub _import_eml { # MboxReader callback
13         my ($eml, $sto, $set_kw) = @_;
14         $sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ());
15 }
16
17 sub import_done_wait { # dwaitpid callback
18         my ($arg, $pid) = @_;
19         my ($imp, $lei) = @$arg;
20         $lei->child_error($?, 'non-fatal errors during import') if $?;
21         my $ign = $lei->{sto}->ipc_do('done'); # PublicInbox::LeiStore::done
22         $lei->dclose;
23 }
24
25 sub import_done { # EOF callback for main daemon
26         my ($lei) = @_;
27         my $imp = delete $lei->{imp} or return;
28         $imp->wq_wait_old(\&import_done_wait, $lei);
29 }
30
31 sub import_start {
32         my ($lei) = @_;
33         my $self = $lei->{imp};
34         my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
35         if (my $nrd = $lei->{nrd}) {
36                 # $j = $nrd->net_concurrency($j); TODO
37         } else {
38                 my $nproc = $self->detect_nproc;
39                 $j = $nproc if $j > $nproc;
40         }
41         my $op = $lei->workers_start($self, 'lei_import', $j, {
42                 '' => [ \&import_done, $lei ],
43         });
44         $self->wq_io_do('import_stdin', []) if $self->{0};
45         for my $input (@{$self->{inputs}}) {
46                 $self->wq_io_do('import_path_url', [], $input);
47         }
48         $self->wq_close(1);
49         while ($op && $op->{sock}) { $op->event_step }
50 }
51
52 sub call { # the main "lei import" method
53         my ($cls, $lei, @inputs) = @_;
54         my $sto = $lei->_lei_store(1);
55         $sto->write_prepare($lei);
56         my ($nrd, @f, @d);
57         $lei->{opt}->{kw} //= 1;
58         my $self = $lei->{imp} = bless { inputs => \@inputs }, $cls;
59         if ($lei->{opt}->{stdin}) {
60                 @inputs and return $lei->fail("--stdin and @inputs do not mix");
61                 $lei->check_input_format or return;
62                 $self->{0} = $lei->{0};
63         }
64
65         # TODO: do we need --format for non-stdin?
66         my $fmt = $lei->{opt}->{'format'};
67         # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
68         for my $input (@inputs) {
69                 my $input_path = $input;
70                 if ($input =~ m!\A(?:imap|nntp)s?://!i) {
71                         require PublicInbox::NetReader;
72                         $nrd //= PublicInbox::NetReader->new;
73                         $nrd->add_url($input);
74                 } elsif ($input_path =~ s/\A([a-z0-9]+)://is) {
75                         my $ifmt = lc $1;
76                         if (($fmt // $ifmt) ne $ifmt) {
77                                 return $lei->fail(<<"");
78 --format=$fmt and `$ifmt:' conflict
79
80                         }
81                         if (-f $input_path) {
82                                 require PublicInbox::MboxReader;
83                                 PublicInbox::MboxReader->can($ifmt) or return
84                                         $lei->fail("$ifmt not supported");
85                         } elsif (-d _) {
86                                 require PublicInbox::MdirReader;
87                                 $ifmt eq 'maildir' or return
88                                         $lei->fail("$ifmt not supported");
89                         } else {
90                                 return $lei->fail("Unable to handle $input");
91                         }
92                 } elsif (-f $input) { push @f, $input
93                 } elsif (-d _) { push @d, $input
94                 } else { return $lei->fail("Unable to handle $input") }
95         }
96         if (@f) { $lei->check_input_format(\@f) or return }
97         if (@d) { # TODO: check for MH vs Maildir, here
98                 require PublicInbox::MdirReader;
99         }
100         $self->{inputs} = \@inputs;
101         return import_start($lei) if !$nrd;
102
103         if (my $err = $nrd->errors) {
104                 return $lei->fail($err);
105         }
106         $nrd->{quiet} = $lei->{opt}->{quiet};
107         $lei->{nrd} = $nrd;
108         require PublicInbox::LeiAuth;
109         my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd);
110         $auth->auth_start($lei, \&import_start, $lei);
111 }
112
113 sub ipc_atfork_child {
114         my ($self) = @_;
115         delete $self->{lei}->{imp}; # drop circular ref
116         $self->{lei}->lei_atfork_child;
117         $self->SUPER::ipc_atfork_child;
118 }
119
120 sub _import_fh {
121         my ($lei, $fh, $input, $ifmt) = @_;
122         my $set_kw = $lei->{opt}->{kw};
123         eval {
124                 if ($ifmt eq 'eml') {
125                         my $buf = do { local $/; <$fh> } //
126                                 return $lei->child_error(1 << 8, <<"");
127 error reading $input: $!
128
129                         my $eml = PublicInbox::Eml->new(\$buf);
130                         _import_eml($eml, $lei->{sto}, $set_kw);
131                 } else { # some mbox (->can already checked in call);
132                         my $cb = PublicInbox::MboxReader->can($ifmt) //
133                                 die "BUG: bad fmt=$ifmt";
134                         $cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw);
135                 }
136         };
137         $lei->child_error(1 << 8, "<stdin>: $@") if $@;
138 }
139
140 sub _import_maildir { # maildir_each_file cb
141         my ($f, $sto, $set_kw) = @_;
142         $sto->ipc_do('set_eml_from_maildir', $f, $set_kw);
143 }
144
145 sub _import_imap { # imap_each cb
146         my ($url, $uid, $kw, $eml, $sto, $set_kw) = @_;
147         warn "$url $uid";
148         $sto->ipc_do('set_eml', $eml, $set_kw ? @$kw : ());
149 }
150
151 sub import_path_url {
152         my ($self, $input) = @_;
153         my $lei = $self->{lei};
154         my $ifmt = lc($lei->{opt}->{'format'} // '');
155         # TODO auto-detect?
156         if ($input =~ m!\A(imap|nntp)s?://!i) {
157                 $lei->{nrd}->imap_each($input, \&_import_imap, $lei->{sto},
158                                         $lei->{opt}->{kw});
159                 return;
160         } elsif ($input =~ s!\A([a-z0-9]+):!!i) {
161                 $ifmt = lc $1;
162         }
163         if (-f $input) {
164                 open my $fh, '<', $input or return $lei->child_error(1 << 8, <<"");
165 unable to open $input: $!
166
167                 _import_fh($lei, $fh, $input, $ifmt);
168         } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
169                 return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
170 $input appears to a be a maildir, not $ifmt
171 EOM
172                 PublicInbox::MdirReader::maildir_each_file($input,
173                                         \&_import_maildir,
174                                         $lei->{sto}, $lei->{opt}->{kw});
175         } else {
176                 $lei->fail("$input unsupported (TODO)");
177         }
178 }
179
180 sub import_stdin {
181         my ($self) = @_;
182         my $lei = $self->{lei};
183         _import_fh($lei, delete $self->{0}, '<stdin>', $lei->{opt}->{'format'});
184 }
185
186 1;