use strict;
use v5.10.1;
use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
-use PublicInbox::Eml;
-use PublicInbox::PktOp qw(pkt_do);
-sub eml_cb { # used by PublicInbox::LeiInput::input_fh
+# /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
+
+sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
my ($self, $eml, $vmd) = @_;
my $xoids = $self->{lei}->{ale}->xoids_for($eml);
+ if (my $all_vmd = $self->{all_vmd}) {
+ @$vmd{keys %$all_vmd} = values %$all_vmd;
+ }
$self->{lei}->{sto}->ipc_do('set_eml', $eml, $vmd, $xoids);
}
-sub mbox_cb { # MboxReader callback used by PublicInbox::LeiInput::input_fh
+sub input_mbox_cb { # MboxReader callback
my ($eml, $self) = @_;
my $vmd;
if ($self->{-import_kw}) {
my $kw = PublicInbox::MboxReader::mbox_keywords($eml);
$vmd = { kw => $kw } if scalar(@$kw);
}
- eml_cb($self, $eml, $vmd);
+ input_eml_cb($self, $eml, $vmd);
}
-sub import_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($imp, $lei) = @$arg;
- $lei->child_error($?, 'non-fatal errors during import') if $?;
- my $sto = delete $lei->{sto};
- my $wait = $sto->ipc_do('done') if $sto; # PublicInbox::LeiStore::done
- $lei->dclose;
+sub input_maildir_cb { # maildir_each_eml cb
+ my ($f, $kw, $eml, $self) = @_;
+ my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+ if ($self->{-mail_sync}) {
+ if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
+ $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
+ } else {
+ warn "E: $f was not from a Maildir?\n";
+ }
+ }
+ input_eml_cb($self, $eml, $vmd);
}
-sub import_done { # EOF callback for main daemon
- my ($lei) = @_;
- my $imp = delete $lei->{imp} or return;
- $imp->wq_wait_old(\&import_done_wait, $lei);
+sub input_net_cb { # imap_each / nntp_each
+ my ($url, $uid, $kw, $eml, $self) = @_;
+ my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+ $vmd->{sync_info} = [ $url, $uid ] if $self->{-mail_sync};
+ input_eml_cb($self, $eml, $vmd);
}
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
- for my $input (@{$self->{inputs}}) {
- $self->wq_io_do('import_path_url', [], $input);
- }
+ $self->wq_io_do('process_inputs');
$self->wq_close(1);
}
-sub import_start {
- my ($lei) = @_;
- my $self = $lei->{imp};
+sub lei_import { # the main "lei import" method
+ my ($lei, @inputs) = @_;
+ my $sto = $lei->_lei_store(1);
+ $sto->write_prepare($lei);
+ my $self = bless {}, __PACKAGE__;
+ $self->{-import_kw} = $lei->{opt}->{kw} // 1;
+ my $vmd_mod = $self->vmd_mod_extract(\@inputs);
+ return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
+ $self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
+ $self->prepare_inputs($lei, \@inputs) or return;
+ $self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+
$lei->ale; # initialize for workers to read
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
if (my $net = $lei->{net}) {
# $j = $net->net_concurrency($j); TODO
+ if ($lei->{opt}->{incremental} // 1) {
+ $net->{incremental} = 1;
+ $net->{-lms_ro} = $lei->_lei_store->search->lms // 0;
+ }
} else {
my $nproc = $self->detect_nproc;
$j = $nproc if $j > $nproc;
}
- my $ops = { '' => [ \&import_done, $lei ] };
+ my $ops = {};
$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
$self->{-wq_nr_workers} = $j // 1; # locked
- my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
- $self->wq_io_do('import_stdin', []) if $self->{0};
+ $lei->{-eml_noisy} = 1;
+ (my $op_c, $ops) = $lei->workers_start($self, 'lei-import', $j, $ops);
+ $lei->{wq1} = $self;
+ $lei->{-err_type} = 'non-fatal';
net_merge_complete($self) unless $lei->{auth};
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
-sub lei_import { # the main "lei import" method
- my ($lei, @inputs) = @_;
- my $sto = $lei->_lei_store(1);
- $sto->write_prepare($lei);
- my $self = $lei->{imp} = bless {}, __PACKAGE__;
- $self->{-import_kw} = $lei->{opt}->{kw} // 1;
- $self->prepare_inputs($lei, \@inputs) or return;
- import_start($lei);
-}
-
-sub _import_maildir { # maildir_each_eml cb
- my ($f, $kw, $eml, $sto, $set_kw) = @_;
- $sto->ipc_do('set_eml', $eml, $set_kw ? { kw => $kw }: ());
-}
-
-sub _import_net { # imap_each, nntp_each cb
- my ($url, $uid, $kw, $eml, $sto, $set_kw) = @_;
- $sto->ipc_do('set_eml', $eml, $set_kw ? { kw => $kw } : ());
-}
-
-sub import_path_url {
- my ($self, $input) = @_;
- my $lei = $self->{lei};
- my $ifmt = lc($lei->{opt}->{'in-format'} // '');
- # TODO auto-detect?
- if ($input =~ m!\Aimaps?://!i) {
- $lei->{net}->imap_each($input, \&_import_net, $lei->{sto},
- $self->{-import_kw});
- return;
- } elsif ($input =~ m!\A(?:nntps?|s?news)://!i) {
- $lei->{net}->nntp_each($input, \&_import_net, $lei->{sto}, 0);
- return;
- } elsif ($input =~ s!\A([a-z0-9]+):!!i) {
- $ifmt = lc $1;
- }
- if (-f $input) {
- my $m = $lei->{opt}->{'lock'} // ($ifmt eq 'eml' ? ['none'] :
- PublicInbox::MboxLock->defaults);
- my $mbl = PublicInbox::MboxLock->acq($input, 0, $m);
- $self->input_fh($ifmt, $mbl->{fh}, $input);
- } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
- return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
-$input appears to a be a maildir, not $ifmt
-EOM
- PublicInbox::MdirReader::maildir_each_eml($input,
- \&_import_maildir,
- $lei->{sto}, $self->{-import_kw});
- } else {
- $lei->fail("$input unsupported (TODO)");
- }
-}
-
-sub import_stdin {
- my ($self) = @_;
- my $lei = $self->{lei};
- my $in = delete $self->{0};
- $self->input_fh($lei->{opt}->{'in-format'}, $in, '<stdin>');
+sub _complete_import {
+ my ($lei, @argv) = @_;
+ my $sto = $lei->_lei_store or return;
+ my $lms = $sto->search->lms or return;
+ my $match_cb = $lei->complete_url_prepare(\@argv);
+ map { $match_cb->($_) } $lms->folders;
}
no warnings 'once';