]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei convert: mail format conversion sub-command
authorEric Wong <e@80x24.org>
Thu, 18 Feb 2021 20:22:22 +0000 (23:22 +0300)
committerEric Wong <e@80x24.org>
Fri, 19 Feb 2021 00:02:17 +0000 (20:02 -0400)
This will make testing IMAP support for other commands easier, as
it doesn't write to lei/store at all.  Like the pager and MUA,
"git credential" is always spawned by script/lei (and not
lei-daemon) so it has a controlling terminal for password
prompts.

v2: fix missing requires, correct test ordering
v3: ensure config exists for IMAP auth

13 files changed:
MANIFEST
lib/PublicInbox/GitCredential.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiAuth.pm [new file with mode: 0644]
lib/PublicInbox/LeiConvert.pm [new file with mode: 0644]
lib/PublicInbox/LeiDedupe.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/MdirReader.pm
lib/PublicInbox/NetReader.pm
lib/PublicInbox/TestCommon.pm
t/lei-convert.t [new file with mode: 0644]
t/net_reader-imap.t [new file with mode: 0644]

index 820689002f5fef511b545da7cbb7e760667cedf5..4f14677102eed212dcad970d7df74ae934c03738 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -178,6 +178,8 @@ lib/PublicInbox/InputPipe.pm
 lib/PublicInbox/Isearch.pm
 lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
+lib/PublicInbox/LeiAuth.pm
+lib/PublicInbox/LeiConvert.pm
 lib/PublicInbox/LeiCurl.pm
 lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExternal.pm
@@ -360,6 +362,7 @@ t/init.t
 t/ipc.t
 t/iso-2202-jp.eml
 t/kqnotify.t
+t/lei-convert.t
 t/lei-daemon.t
 t/lei-externals.t
 t/lei-import-maildir.t
@@ -388,6 +391,7 @@ t/msg_iter.t
 t/msgmap.t
 t/msgtime.t
 t/multi-mid.t
+t/net_reader-imap.t
 t/nntp.t
 t/nntpd-tls.t
 t/nntpd-v2.t
index 9e193029b877b91b781e61088937e74bf5351bb8..2d81817c8d7cff1c8f996bdd0312ba09c2f498c2 100644 (file)
@@ -4,11 +4,17 @@ package PublicInbox::GitCredential;
 use strict;
 use PublicInbox::Spawn qw(popen_rd);
 
-sub run ($$) {
-       my ($self, $op) = @_;
-       my ($in_r, $in_w);
+sub run ($$;$) {
+       my ($self, $op, $lei) = @_;
+       my ($in_r, $in_w, $out_r);
+       my $cmd = [ qw(git credential), $op ];
        pipe($in_r, $in_w) or die "pipe: $!";
-       my $out_r = popen_rd([qw(git credential), $op], undef, { 0 => $in_r });
+       if ($lei && !$lei->{oneshot}) { # we'll die if disconnected:
+               pipe($out_r, my $out_w) or die "pipe: $!";
+               $lei->send_exec_cmd([ $in_r, $out_w ], $cmd, {});
+       } else {
+               $out_r = popen_rd($cmd, undef, { 0 => $in_r });
+       }
        close $in_r or die "close in_r: $!";
 
        my $out = '';
@@ -41,8 +47,8 @@ sub check_netrc ($) {
 }
 
 sub fill {
-       my ($self) = @_;
-       my $out_r = run($self, 'fill');
+       my ($self, $lei) = @_;
+       my $out_r = run($self, 'fill', $lei);
        while (<$out_r>) {
                chomp;
                return if $_ eq '';
index 1fa9f7516633918a800418e775152c550c7b6356..1e4c36d021712f26e4faebe803061f30c7d7da3a 100644 (file)
@@ -173,7 +173,11 @@ our %CMD = ( # sorted in order of importance/use:
        qw(stdin| offset=i recursive|r exclude=s include|I=s
        format|f=s kw|keywords|flags!),
        ],
-
+'convert' => [ 'LOCATION...|--stdin',
+       'one-time conversion from URL or filesystem to another format',
+       qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s quiet|q
+       kw|keywords|flags!),
+       ],
 'config' => [ '[...]', sub {
                'git-config(1) wrapper for '._config_path($_[0]);
        }, qw(config-file|system|global|file|f=s), # for conflict detection
@@ -320,7 +324,7 @@ my %CONFIG_KEYS = (
        'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m imp mrr); # internal workers
+my @WQ_KEYS = qw(lxs l2m imp mrr cnv auth); # internal workers
 
 # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
 sub x_it ($$) {
@@ -391,18 +395,19 @@ sub fail ($$;$) {
        undef;
 }
 
-sub check_input_format ($;$) {
-       my ($self, $files) = @_;
-       my $fmt = $self->{opt}->{'format'};
+sub check_input_format ($;$$) {
+       my ($self, $files, $opt_key) = @_;
+       $opt_key //= 'format';
+       my $fmt = $self->{opt}->{$opt_key};
        if (!$fmt) {
                my $err = $files ? "regular file(s):\n@$files" : '--stdin';
-               return fail($self, "--format unset for $err");
+               return fail($self, "--$opt_key unset for $err");
        }
        return 1 if $fmt eq 'eml';
        # XXX: should this handle {gz,bz2,xz}? that's currently in LeiToMail
        require PublicInbox::MboxReader;
        PublicInbox::MboxReader->can($fmt) ||
-                               fail($self, "--format=$fmt unrecognized");
+                               fail($self, "--$opt_key=$fmt unrecognized");
 }
 
 sub out ($;@) {
@@ -445,6 +450,7 @@ sub lei_atfork_child {
        } else {
                delete $self->{0};
        }
+       delete @$self{qw(cnv)};
        for (delete @$self{qw(3 sock old_1 au_done)}) {
                close($_) if defined($_);
        }
@@ -626,6 +632,11 @@ sub lei_import {
        PublicInbox::LeiImport->call(@_);
 }
 
+sub lei_convert {
+       require PublicInbox::LeiConvert;
+       PublicInbox::LeiConvert->call(@_);
+}
+
 sub lei_init {
        my ($self, $dir) = @_;
        my $cfg = _lei_cfg($self, 1);
@@ -770,6 +781,13 @@ sub start_mua {
        delete $self->{opt}->{verbose};
 }
 
+sub send_exec_cmd { # tell script/lei to execute a command
+       my ($self, $io, $cmd, $env) = @_;
+       my $sock = $self->{sock} // die 'lei client gone';
+       my $fds = [ map { fileno($_) } @$io ];
+       $send_cmd->($sock, $fds, exec_buf($cmd, $env), MSG_EOR);
+}
+
 sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
        my ($self) = @_;
        my $alerts = $self->{opt}->{alert} // return;
@@ -813,10 +831,9 @@ sub start_pager {
        pipe(my ($r, $wpager)) or return warn "pipe: $!";
        my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
        my $pgr = [ undef, @$rdr{1, 2} ];
-       if (my $sock = $self->{sock}) { # lei(1) process runs it
+       if ($self->{sock}) { # lei(1) process runs it
                delete @$new_env{keys %$env}; # only set iff unset
-               my $fds = [ map { fileno($_) } @$rdr{0..2} ];
-               $send_cmd->($sock, $fds, exec_buf([$pager], $new_env), MSG_EOR);
+               send_exec_cmd($self, [ @$rdr{0..2} ], [$pager], $new_env);
        } elsif ($self->{oneshot}) {
                my $cmd = [$pager];
                $self->{"pid.$self.$$"}->{spawn($cmd, $new_env, $rdr)} = $cmd;
@@ -920,6 +937,7 @@ sub event_step {
 
 sub event_step_init {
        my ($self) = @_;
+       return if $self->{-event_init_done}++;
        if (my $sock = $self->{sock}) { # using DS->EventLoop
                $self->SUPER::new($sock, EPOLLIN|EPOLLET);
        }
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
new file mode 100644 (file)
index 0000000..6593ba5
--- /dev/null
@@ -0,0 +1,81 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Authentication worker for anything that needs auth for read/write IMAP
+# (eventually for read-only NNTP access)
+package PublicInbox::LeiAuth;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use PublicInbox::PktOp qw(pkt_do);
+use PublicInbox::NetReader;
+
+sub nrd_merge {
+       my ($lei, $nrd_new) = @_;
+       if ($lei->{pkt_op_p}) { # from lei_convert worker
+               pkt_do($lei->{pkt_op_p}, 'nrd_merge', $nrd_new);
+       } else { # single lei-daemon consumer
+               my $self = $lei->{auth} or return; # client disconnected
+               my $nrd = $self->{nrd};
+               %$nrd = (%$nrd, %$nrd_new);
+       }
+}
+
+sub do_auth { # called via wq_io_do
+       my ($self) = @_;
+       my ($lei, $nrd) = @$self{qw(lei nrd)};
+       $nrd->imap_common_init($lei);
+       nrd_merge($lei, $nrd); # tell lei-daemon updated auth info
+}
+
+sub do_finish_auth { # dwaitpid callback
+       my ($arg, $pid) = @_;
+       my ($self, $lei, $post_auth_cb, @args) = @$arg;
+       $? ? $lei->dclose : $post_auth_cb->(@args);
+}
+
+sub auth_eof {
+       my ($lei, $post_auth_cb, @args) = @_;
+       my $self = delete $lei->{auth} or return;
+       $self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args);
+}
+
+sub auth_start {
+       my ($self, $lei, $post_auth_cb, @args) = @_;
+       $lei->_lei_cfg(1); # workers may need to read config
+       my $ops = {
+               '!' => [ $lei->can('fail_handler'), $lei ],
+               '|' => [ $lei->can('sigpipe_handler'), $lei ],
+               'x_it' => [ $lei->can('x_it'), $lei ],
+               'child_error' => [ $lei->can('child_error'), $lei ],
+               'nrd_merge' => [ \&nrd_merge, $lei ],
+               '' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
+       };
+       ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+       $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei});
+       my $op = delete $lei->{pkt_op_c};
+       delete $lei->{pkt_op_p};
+       $self->wq_io_do('do_auth', []);
+       $self->wq_close(1);
+       $lei->event_step_init; # wait for shutdowns
+       if ($lei->{oneshot}) {
+               while ($op->{sock}) { $op->event_step }
+       }
+}
+
+sub ipc_atfork_child {
+       my ($self) = @_;
+       # prevent {sock} from being closed in lei_atfork_child:
+       my $s = delete $self->{lei}->{sock};
+       delete $self->{lei}->{auth}; # drop circular ref
+       $self->{lei}->lei_atfork_child;
+       $self->{lei}->{sock} = $s if $s;
+       $self->SUPER::ipc_atfork_child;
+}
+
+sub new {
+       my ($cls, $nrd) = @_;
+       bless { nrd => $nrd }, $cls;
+}
+
+1;
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
new file mode 100644 (file)
index 0000000..78fd5e1
--- /dev/null
@@ -0,0 +1,160 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# front-end for the "lei convert" sub-command
+package PublicInbox::LeiConvert;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use PublicInbox::Eml;
+use PublicInbox::InboxWritable qw(eml_from_path);
+use PublicInbox::PktOp;
+use PublicInbox::LeiStore;
+use PublicInbox::LeiOverview;
+
+sub mbox_cb {
+       my ($eml, $self) = @_;
+       my @kw = PublicInbox::LeiStore::mbox_keywords($eml);
+       $eml->header_set($_) for qw(Status X-Status);
+       $self->{wcb}->(undef, { kw => \@kw }, $eml);
+}
+
+sub imap_cb { # ->imap_each
+       my ($url, $uid, $kw, $eml, $self) = @_;
+       $self->{wcb}->(undef, { kw => $kw }, $eml);
+}
+
+sub mdir_cb {
+       my ($kw, $eml, $self) = @_;
+       $self->{wcb}->(undef, { kw => $kw }, $eml);
+}
+
+sub do_convert { # via wq_do
+       my ($self) = @_;
+       my $lei = $self->{lei};
+       my $in_fmt = $lei->{opt}->{'in-format'};
+       if (my $stdin = delete $self->{0}) {
+               PublicInbox::MboxReader->$in_fmt($stdin, \&mbox_cb, $self);
+       }
+       for my $input (@{$self->{inputs}}) {
+               my $ifmt = lc($in_fmt // '');
+               if ($input =~ m!\A(?:imap|nntp)s?://!) { # TODO: nntp
+                       $lei->{nrd}->imap_each($input, \&imap_cb, $self);
+                       next;
+               } elsif ($input =~ s!\A([a-z0-9]+):!!i) {
+                       $ifmt = lc $1;
+               }
+               if (-f $input) {
+                       open my $fh, '<', $input or
+                                       return $lei->fail("open $input: $!");
+                       PublicInbox::MboxReader->$ifmt($fh, \&mbox_cb, $self);
+               } elsif (-d _) {
+                       PublicInbox::MdirReader::maildir_each_eml($input,
+                                                       \&mdir_cb, $self);
+               } else {
+                       die "BUG: $input unhandled"; # should've failed earlier
+               }
+       }
+       delete $lei->{1};
+       delete $self->{wcb}; # commit
+}
+
+sub convert_start {
+       my ($lei) = @_;
+       my $ops = {
+               '!' => [ $lei->can('fail_handler'), $lei ],
+               '|' => [ $lei->can('sigpipe_handler'), $lei ],
+               'x_it' => [ $lei->can('x_it'), $lei ],
+               'child_error' => [ $lei->can('child_error'), $lei ],
+               '' => [ $lei->can('dclose'), $lei ],
+       };
+       ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+       my $self = $lei->{cnv};
+       $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei});
+       my $op = delete $lei->{pkt_op_c};
+       delete $lei->{pkt_op_p};
+       $self->wq_io_do('do_convert', []);
+       $self->wq_close(1);
+       $lei->event_step_init; # wait for shutdowns
+       if ($lei->{oneshot}) {
+               while ($op->{sock}) { $op->event_step }
+       }
+}
+
+sub call { # the main "lei convert" method
+       my ($cls, $lei, @inputs) = @_;
+       my $opt = $lei->{opt};
+       $opt->{kw} //= 1;
+       my $self = $lei->{cnv} = bless {}, $cls;
+       my $in_fmt = $opt->{'in-format'};
+       my ($nrd, @f, @d);
+       $opt->{dedupe} //= 'none';
+       my $ovv = PublicInbox::LeiOverview->new($lei, 'out-format');
+       $lei->{l2m} or return
+               $lei->fail("output not specified or is not a mail destination");
+       $opt->{augment} = 1 unless $ovv->{dst} eq '/dev/stdout';
+       if ($opt->{stdin}) {
+               @inputs and return $lei->fail("--stdin and @inputs do not mix");
+               $lei->check_input_format(undef, 'in-format') or return;
+               $self->{0} = $lei->{0};
+       }
+       # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
+       for my $input (@inputs) {
+               my $input_path = $input;
+               if ($input =~ m!\A(?:imap|nntp)s?://!i) {
+                       require PublicInbox::NetReader;
+                       $nrd //= PublicInbox::NetReader->new;
+                       $nrd->add_url($input);
+               } elsif ($input_path =~ s/\A([a-z0-9]+)://is) {
+                       my $ifmt = lc $1;
+                       if (($in_fmt // $ifmt) ne $ifmt) {
+                               return $lei->fail(<<"");
+--in-format=$in_fmt and `$ifmt:' conflict
+
+                       }
+                       if (-f $input_path) {
+                               require PublicInbox::MboxReader;
+                               PublicInbox::MboxReader->can($ifmt) or return
+                                       $lei->fail("$ifmt not supported");
+                       } elsif (-d _) {
+                               require PublicInbox::MdirReader;
+                               $ifmt eq 'maildir' or return
+                                       $lei->fail("$ifmt not supported");
+                       } else {
+                               return $lei->fail("Unable to handle $input");
+                       }
+               } elsif (-f $input) { push @f, $input }
+               elsif (-d _) { push @d, $input }
+               else { return $lei->fail("Unable to handle $input") }
+       }
+       if (@f) { $lei->check_input_format(\@f, 'in-format') or return }
+       if (@d) { # TODO: check for MH vs Maildir, here
+               require PublicInbox::MdirReader;
+       }
+       $self->{inputs} = \@inputs;
+       return convert_start($lei) if !$nrd;
+
+       if (my $err = $nrd->errors) {
+               return $lei->fail($err);
+       }
+       $nrd->{quiet} = $opt->{quiet};
+       $lei->{nrd} = $nrd;
+       require PublicInbox::LeiAuth;
+       my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd);
+       $auth->auth_start($lei, \&convert_start, $lei);
+}
+
+sub ipc_atfork_child {
+       my ($self) = @_;
+       my $lei = $self->{lei};
+       $lei->lei_atfork_child;
+       my $l2m = delete $lei->{l2m};
+       $l2m->pre_augment($lei);
+       $l2m->do_augment($lei);
+       $l2m->post_augment($lei);
+       $self->{wcb} = $l2m->write_cb($lei);
+       $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
+       $self->SUPER::ipc_atfork_child;
+}
+
+1;
index 2114c0e8629bb75822c9b7d5ce663d259542cbb4..5fec9384f1e305cae739e17562377f6377504f61 100644 (file)
@@ -127,7 +127,7 @@ sub prepare_dedupe {
 
 sub pause_dedupe {
        my ($self) = @_;
-       my $skv = $self->[0];
+       my $skv = $self->[0] or return;
        $skv->dbh_release;
        delete($skv->{dbh}) if $skv;
 }
index c820f0d7c5262edea2619a54a8ba74715eac9b1c..3169bae62569a464146185b08fe23e691aec1406 100644 (file)
@@ -51,18 +51,19 @@ sub detect_fmt ($$) {
 }
 
 sub new {
-       my ($class, $lei) = @_;
+       my ($class, $lei, $ofmt_key) = @_;
        my $opt = $lei->{opt};
        my $dst = $opt->{output} // '-';
        $dst = '/dev/stdout' if $dst eq '-';
+       $ofmt_key //= 'format';
 
-       my $fmt = $opt->{'format'};
+       my $fmt = $opt->{$ofmt_key};
        $fmt = lc($fmt) if defined $fmt;
        if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/
                my $ofmt = lc $1;
                $fmt //= $ofmt;
                return $lei->fail(<<"") if $fmt ne $ofmt;
---format=$fmt and --output=$ofmt conflict
+--$ofmt_key=$fmt and --output=$ofmt conflict
 
        }
        $fmt //= 'json' if $dst eq '/dev/stdout';
index e3e512beccbb1be60d63a82e9d5def8fcb4eb06b..f0adc44f23e6088b13e40d43b01fec9fa1705c7d 100644 (file)
@@ -437,7 +437,7 @@ sub _do_augment_mbox {
        $dedupe->pause_dedupe if $dedupe;
 }
 
-sub pre_augment { # fast (1 disk seek), runs in main daemon
+sub pre_augment { # fast (1 disk seek), runs in same process as post_augment
        my ($self, $lei) = @_;
        # _pre_augment_maildir, _pre_augment_mbox
        my $m = "_pre_augment_$self->{base_type}";
@@ -451,7 +451,8 @@ sub do_augment { # slow, runs in wq worker
        $self->$m($lei);
 }
 
-sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
+# fast (spawn compressor or mkdir), runs in same process as pre_augment
+sub post_augment {
        my ($self, $lei, @args) = @_;
        # _post_augment_maildir, _post_augment_mbox
        my $m = "_post_augment_$self->{base_type}";
index e0ff676deb19eb656a37e0e30fc187a29732405d..5fa534f58cbb139913155b3529d8b16e2e26cd83 100644 (file)
@@ -7,6 +7,7 @@
 package PublicInbox::MdirReader;
 use strict;
 use v5.10.1;
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 # returns Maildir flags from a basename ('' for no flags, undef for invalid)
 sub maildir_basename_flags {
@@ -36,4 +37,29 @@ sub maildir_each_file ($$;@) {
        }
 }
 
+my %c2kw = ('D' => 'draft', F => 'flagged', R => 'answered', S => 'seen');
+
+sub maildir_each_eml ($$;@) {
+       my ($dir, $cb, @arg) = @_;
+       $dir .= '/' unless substr($dir, -1) eq '/';
+       my $pfx = "$dir/new/";
+       if (opendir(my $dh, $pfx)) {
+               while (defined(my $bn = readdir($dh))) {
+                       next if substr($bn, 0, 1) eq '.';
+                       my @f = split(/:/, $bn, -1);
+                       next if scalar(@f) != 1;
+                       my $eml = eml_from_path($pfx.$bn) or next;
+                       $cb->([], $eml, @arg);
+               }
+       }
+       $pfx = "$dir/cur/";
+       opendir my $dh, $pfx or return;
+       while (defined(my $bn = readdir($dh))) {
+               my $fl = maildir_basename_flags($bn) // next;
+               my $eml = eml_from_path($pfx.$bn) or next;
+               my @kw = sort(map { $c2kw{$_} // () } split(//, $fl));
+               $cb->(\@kw, $eml, @arg);
+       }
+}
+
 1;
index 1d0534254381d7a1b8e86c8fab476b6dbeff97c1..ad8c18d0df132c4687ec167c1224d67ddd55d1e8 100644 (file)
@@ -5,7 +5,8 @@
 package PublicInbox::NetReader;
 use strict;
 use v5.10.1;
-use parent qw(Exporter);
+use parent qw(Exporter PublicInbox::IPC);
+use PublicInbox::Eml;
 
 # TODO: trim this down, this is huge
 our @EXPORT = qw(uri_new uri_scheme uri_section
@@ -33,7 +34,7 @@ sub uri_section ($) {
 sub auth_anon_cb { '' }; # for Mail::IMAPClient::Authcallback
 
 sub mic_for { # mic = Mail::IMAPClient
-       my ($self, $url, $mic_args) = @_;
+       my ($self, $url, $mic_args, $lei) = @_;
        require PublicInbox::URIimap;
        my $uri = PublicInbox::URIimap->new($url);
        require PublicInbox::GitCredential;
@@ -74,21 +75,26 @@ sub mic_for { # mic = Mail::IMAPClient
        }
        if ($cred) {
                $cred->check_netrc unless defined $cred->{password};
-               $cred->fill; # may prompt user here
+               $cred->fill($lei); # may prompt user here
                $mic->User($mic_arg->{User} = $cred->{username});
                $mic->Password($mic_arg->{Password} = $cred->{password});
        } else { # AUTH=ANONYMOUS
                $mic->Authmechanism($mic_arg->{Authmechanism} = 'ANONYMOUS');
-               $mic->Authcallback($mic_arg->{Authcallback} = \&auth_anon_cb);
+               $mic_arg->{Authcallback} = 'auth_anon_cb';
+               $mic->Authcallback(\&auth_anon_cb);
        }
+       my $err;
        if ($mic->login && $mic->IsAuthenticated) {
                # success! keep IMAPClient->new arg in case we get disconnected
                $self->{mic_arg}->{uri_section($uri)} = $mic_arg;
        } else {
-               warn "E: <$url> LOGIN: $@\n";
+               $err = "E: <$url> LOGIN: $@\n";
                $mic = undef;
        }
        $cred->run($mic ? 'approve' : 'reject') if $cred;
+       if ($err) {
+               $lei ? $lei->fail($err) : warn($err);
+       }
        $mic;
 }
 
@@ -139,8 +145,8 @@ E: <$url> STARTTLS requested and failed
        $nn;
 }
 
-sub nn_for ($$$) { # nn = Net::NNTP
-       my ($self, $url, $nn_args) = @_;
+sub nn_for ($$$;$) { # nn = Net::NNTP
+       my ($self, $url, $nn_args, $lei) = @_;
        my $uri = uri_new($url);
        my $sec = uri_section($uri);
        my $nntp_opt = $self->{nntp_opt}->{$sec} //= {};
@@ -170,7 +176,7 @@ sub nn_for ($$$) { # nn = Net::NNTP
        my $nn = nn_new($nn_arg, $nntp_opt, $url);
 
        if ($cred) {
-               $cred->fill; # may prompt user here
+               $cred->fill($lei); # may prompt user here
                if ($nn->authinfo($u, $p)) {
                        push @{$nntp_opt->{-postconn}}, [ 'authinfo', $u, $p ];
                } else {
@@ -240,14 +246,15 @@ sub cfg_bool ($$$) {
 }
 
 # flesh out common IMAP-specific data structures
-sub imap_common_init ($) {
-       my ($self) = @_;
+sub imap_common_init ($;$) {
+       my ($self, $lei) = @_;
+       $self->{quiet} = 1 if $lei && $lei->{opt}->{quiet};
        eval { require PublicInbox::IMAPClient } or
                die "Mail::IMAPClient is required for IMAP:\n$@\n";
        eval { require PublicInbox::IMAPTracker } or
                die "DBD::SQLite is required for IMAP\n:$@\n";
        require PublicInbox::URIimap;
-       my $cfg = $self->{pi_cfg};
+       my $cfg = $self->{pi_cfg} // $lei->_lei_cfg;
        my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg
        for my $url (@{$self->{imap_order}}) {
                my $uri = PublicInbox::URIimap->new($url);
@@ -275,7 +282,8 @@ sub imap_common_init ($) {
        my $mics = {}; # schema://authority => IMAPClient obj
        for my $url (@{$self->{imap_order}}) {
                my $uri = PublicInbox::URIimap->new($url);
-               $mics->{uri_section($uri)} //= mic_for($self, $url, $mic_args);
+               my $sec = uri_section($uri);
+               $mics->{$sec} //= mic_for($self, $url, $mic_args, $lei);
        }
        $mics;
 }
@@ -294,9 +302,140 @@ sub errors {
        if (my $u = $self->{unsupported_url}) {
                return "Unsupported URL(s): @$u";
        }
+       if ($self->{imap_order}) {
+               eval { require PublicInbox::IMAPClient } or
+                       die "Mail::IMAPClient is required for IMAP:\n$@\n";
+       }
        undef;
 }
 
+my %IMAPflags2kw = (
+       '\Seen' => 'seen',
+       '\Answered' => 'answered',
+       '\Flagged' => 'flagged',
+       '\Draft' => 'draft',
+);
+
+sub _imap_do_msg ($$$$$) {
+       my ($self, $url, $uid, $raw, $flags) = @_;
+       # our target audience expects LF-only, save storage
+       $$raw =~ s/\r\n/\n/sg;
+       my $kw = [];
+       for my $f (split(/ /, $flags)) {
+               my $k = $IMAPflags2kw{$f} // next; # TODO: X-Label?
+               push @$kw, $k;
+       }
+       my ($eml_cb, @args) = @{$self->{eml_each}};
+       $eml_cb->($url, $uid, $kw, PublicInbox::Eml->new($raw), @args);
+}
+
+sub _imap_fetch_all ($$$) {
+       my ($self, $mic, $url) = @_;
+       my $uri = PublicInbox::URIimap->new($url);
+       my $sec = uri_section($uri);
+       my $mbx = $uri->mailbox;
+       $mic->Clear(1); # trim results history
+       $mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!";
+       my ($r_uidval, $r_uidnext);
+       for ($mic->Results) {
+               /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ and $r_uidval = $1;
+               /^\* OK \[UIDNEXT ([0-9]+)\].*/ and $r_uidnext = $1;
+               last if $r_uidval && $r_uidnext;
+       }
+       $r_uidval //= $mic->uidvalidity($mbx) //
+               return "E: $url cannot get UIDVALIDITY";
+       $r_uidnext //= $mic->uidnext($mbx) //
+               return "E: $url cannot get UIDNEXT";
+       my $itrk = $self->{incremental} ?
+                       PublicInbox::IMAPTracker->new($url) : 0;
+       my ($l_uidval, $l_uid) = $itrk ? $itrk->get_last : ();
+       $l_uidval //= $r_uidval; # first time
+       $l_uid //= 1;
+       if ($l_uidval != $r_uidval) {
+               return "E: $url UIDVALIDITY mismatch\n".
+                       "E: local=$l_uidval != remote=$r_uidval";
+       }
+       my $r_uid = $r_uidnext - 1;
+       if ($l_uid != 1 && $l_uid > $r_uid) {
+               return "E: $url local UID exceeds remote ($l_uid > $r_uid)\n".
+                       "E: $url strangely, UIDVALIDLITY matches ($l_uidval)\n";
+       }
+       return if $l_uid >= $r_uid; # nothing to do
+
+       warn "# $url fetching UID $l_uid:$r_uid\n" unless $self->{quiet};
+       $mic->Uid(1); # the default, we hope
+       my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
+       my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
+       my $key = $req;
+       $key =~ s/\.PEEK//;
+       my ($uids, $batch);
+       my $err;
+       do {
+               # I wish "UID FETCH $START:*" could work, but:
+               # 1) servers do not need to return results in any order
+               # 2) Mail::IMAPClient doesn't offer a streaming API
+               $uids = $mic->search("UID $l_uid:*") or
+                       return "E: $url UID SEARCH $l_uid:* error: $!";
+               return if scalar(@$uids) == 0;
+
+               # RFC 3501 doesn't seem to indicate order of UID SEARCH
+               # responses, so sort it ourselves.  Order matters so
+               # IMAPTracker can store the newest UID.
+               @$uids = sort { $a <=> $b } @$uids;
+
+               # Did we actually get new messages?
+               return if $uids->[0] < $l_uid;
+
+               $l_uid = $uids->[-1] + 1; # for next search
+               my $last_uid;
+               my $n = $self->{max_batch};
+               while (scalar @$uids) {
+                       my @batch = splice(@$uids, 0, $bs);
+                       $batch = join(',', @batch);
+                       local $0 = "UID:$batch $mbx $sec";
+                       my $r = $mic->fetch_hash($batch, $req, 'FLAGS');
+                       unless ($r) { # network error?
+                               $err = "E: $url UID FETCH $batch error: $!";
+                               last;
+                       }
+                       for my $uid (@batch) {
+                               # messages get deleted, so holes appear
+                               my $per_uid = delete $r->{$uid} // next;
+                               my $raw = delete($per_uid->{$key}) // next;
+                               _imap_do_msg($self, $url, $uid, \$raw,
+                                               $per_uid->{FLAGS});
+                               $last_uid = $uid;
+                               last if $self->{quit};
+                       }
+                       last if $self->{quit};
+               }
+               $itrk->update_last($r_uidval, $last_uid) if $itrk;
+       } until ($err || $self->{quit});
+       $err;
+}
+
+sub imap_each {
+       my ($self, $url, $eml_cb, @args) = @_;
+       my $uri = PublicInbox::URIimap->new($url);
+       my $sec = uri_section($uri);
+       my $mic_arg = $self->{mic_arg}->{$sec} or
+                       die "BUG: no Mail::IMAPClient->new arg for $sec";
+       local $0 = $uri->mailbox." $sec";
+       my $cb_name = $mic_arg->{Authcallback};
+       if (ref($cb_name) ne 'CODE') {
+               $mic_arg->{Authcallback} = $self->can($cb_name);
+       }
+       my $mic = PublicInbox::IMAPClient->new(%$mic_arg, Debug => 0);
+       my $err;
+       if ($mic && $mic->IsConnected) {
+               local $self->{eml_each} = [ $eml_cb, @args ];
+               $err = _imap_fetch_all($self, $mic, $url);
+       } else {
+               $err = "E: not connected: $!";
+       }
+       $mic;
+}
+
 sub new { bless {}, shift };
 
 1;
index c5070cfd43b020f32927dac557e8d521aba0bfe7..3eb08e9f9f55ee954776b3a7570cf3c9c5a6a7c3 100644 (file)
@@ -462,10 +462,15 @@ our $lei = sub {
 sub lei (@) { $lei->(@_) }
 
 sub lei_ok (@) {
-       my $msg = ref($_[-1]) ? pop(@_) : undef;
+       my $msg = ref($_[-1]) eq 'SCALAR' ? pop(@_) : undef;
+       my $tmpdir = quotemeta(File::Spec->tmpdir);
        # filter out anything that looks like a path name for consistent logs
-       my @msg = grep(!m!\A/!, @_);
-       ok($lei->(@_), "lei @msg". ($msg ? " ($$msg)" : ''));
+       my @msg = ref($_[0]) eq 'ARRAY' ? @{$_[0]} : @_;
+       for (@msg) {
+               s!\A([a-z0-9]+://)[^/]+/!$1\$HOST_PORT/! ||
+                       s!$tmpdir\b/(?:[^/]+/)?!\$TMPDIR/!;
+       }
+       ok(lei(@_), "lei @msg". ($msg ? " ($$msg)" : '')) or diag $lei_err;
 }
 
 sub json_utf8 () {
diff --git a/t/lei-convert.t b/t/lei-convert.t
new file mode 100644 (file)
index 0000000..f58a0a8
--- /dev/null
@@ -0,0 +1,71 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict; use v5.10.1; use PublicInbox::TestCommon;
+use PublicInbox::MboxReader;
+use PublicInbox::MdirReader;
+use PublicInbox::NetReader;
+require_git 2.6;
+require_mods(qw(DBD::SQLite Search::Xapian));
+my ($tmpdir, $for_destroy) = tmpdir;
+my $sock = tcp_server;
+my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ];
+my ($ro_home, $cfg_path) = setup_public_inboxes;
+my $env = { PI_CONFIG => $cfg_path };
+my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd: $?");
+my $host_port = tcp_host_port($sock);
+undef $sock;
+test_lei({ tmpdir => $tmpdir }, sub {
+       my $d = $ENV{HOME};
+       my $dig = Digest::SHA->new(256);
+       lei_ok('convert', '-o', "mboxrd:$d/foo.mboxrd",
+               "imap://$host_port/t.v2.0");
+       ok(-f "$d/foo.mboxrd", 'mboxrd created');
+       my (@mboxrd, @mboxcl2);
+       open my $fh, '<', "$d/foo.mboxrd" or BAIL_OUT $!;
+       PublicInbox::MboxReader->mboxrd($fh, sub { push @mboxrd, shift });
+       ok(scalar(@mboxrd) > 1, 'got multiple messages');
+
+       lei_ok('convert', '-o', "mboxcl2:$d/cl2", "mboxrd:$d/foo.mboxrd");
+       ok(-s "$d/cl2", 'mboxcl2 non-empty') or diag $lei_err;
+       open $fh, '<', "$d/cl2" or BAIL_OUT $!;
+       PublicInbox::MboxReader->mboxcl2($fh, sub {
+               my $eml = shift;
+               $eml->header_set($_) for (qw(Content-Length Lines));
+               push @mboxcl2, $eml;
+       });
+       is_deeply(\@mboxcl2, \@mboxrd, 'mboxrd and mboxcl2 have same mail');
+
+       lei_ok('convert', '-o', "$d/md", "mboxrd:$d/foo.mboxrd");
+       ok(-d "$d/md", 'Maildir created');
+       my @md;
+       PublicInbox::MdirReader::maildir_each_eml("$d/md", sub {
+               push @md, $_[1];
+       });
+       is(scalar(@md), scalar(@mboxrd), 'got expected emails in Maildir');
+       @md = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @md;
+       @mboxrd = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @mboxrd;
+       my @rd_nostatus = map {
+               my $eml = PublicInbox::Eml->new(\($_->as_string));
+               $eml->header_set('Status');
+               $eml;
+       } @mboxrd;
+       is_deeply(\@md, \@rd_nostatus, 'Maildir output matches mboxrd');
+
+       my @bar;
+       lei_ok('convert', '-o', "mboxrd:$d/bar.mboxrd", "$d/md");
+       open $fh, '<', "$d/bar.mboxrd" or BAIL_OUT $!;
+       PublicInbox::MboxReader->mboxrd($fh, sub { push @bar, shift });
+       @bar = sort { ${$a->{bdy}} cmp ${$b->{bdy}} } @bar;
+       is_deeply(\@mboxrd, \@bar,
+                       'mboxrd round-tripped through Maildir w/ flags');
+
+       open my $in, '<', "$d/foo.mboxrd" or BAIL_OUT;
+       my $rdr = { 0 => $in, 1 => \(my $out), 2 => \$lei_err };
+       lei_ok([qw(convert --stdin -F mboxrd -o mboxrd:/dev/stdout)],
+               undef, $rdr);
+       open $fh, '<', "$d/foo.mboxrd" or BAIL_OUT;
+       my $exp = do { local $/; <$fh> };
+       is($out, $exp, 'stdin => stdout');
+});
+done_testing;
diff --git a/t/net_reader-imap.t b/t/net_reader-imap.t
new file mode 100644 (file)
index 0000000..eea8b0f
--- /dev/null
@@ -0,0 +1,40 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict; use v5.10.1; use PublicInbox::TestCommon;
+require_git 2.6;
+require_mods(qw(DBD::SQLite Search::Xapian));
+my ($tmpdir, $for_destroy) = tmpdir;
+my ($ro_home, $cfg_path) = setup_public_inboxes;
+my $cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/1", "--stderr=$tmpdir/2" ];
+my $sock = tcp_server;
+my $env = { PI_CONFIG => $cfg_path };
+my $td = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT "-imapd: $?";
+my ($host, $port) = tcp_host_port $sock;
+require_ok 'PublicInbox::NetReader';
+my $nrd = PublicInbox::NetReader->new;
+$nrd->add_url(my $url = "imap://$host:$port/t.v2.0");
+is($nrd->errors, undef, 'no errors');
+$nrd->{pi_cfg} = PublicInbox::Config->new($cfg_path);
+$nrd->imap_common_init;
+$nrd->{quiet} = 1;
+my (%eml, %urls, %args, $nr, @w);
+local $SIG{__WARN__} = sub { push(@w, @_) };
+$nrd->imap_each($url, sub {
+       my ($u, $uid, $kw, $eml, $arg) = @_;
+       ++$urls{$u};
+       ++$args{$arg};
+       like($uid, qr/\A[0-9]+\z/, 'got digit UID '.$uid);
+       ++$eml{ref($eml)};
+       ++$nr;
+}, 'blah');
+is(scalar(@w), 0, 'no warnings');
+ok($nr, 'got some emails');
+is($eml{'PublicInbox::Eml'}, $nr, 'got expected Eml objects');
+is(scalar keys %eml, 1, 'only got Eml objects');
+is($urls{$url}, $nr, 'one URL expected number of times');
+is(scalar keys %urls, 1, 'only got one URL');
+is($args{blah}, $nr, 'got arg expected number of times');
+is(scalar keys %args, 1, 'only got one arg');
+
+done_testing;