]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei_input: support compressed mboxes
authorEric Wong <e@80x24.org>
Mon, 29 Mar 2021 17:49:42 +0000 (17:49 +0000)
committerEric Wong <e@80x24.org>
Mon, 29 Mar 2021 23:57:37 +0000 (19:57 -0400)
Since "lei q" and "lei convert" already support writing these
compressed inboxes, it makes sense that all mbox readers support
them, as well.

Using compression is one reliable way to know an mboxrd or mboxo
hasn't been unexpectedly truncated.

lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/MboxReader.pm
t/lei-convert.t
t/lei_to_mail.t

index da3b50ccfefd0a4ffbcec9307e113ddb4c4d31f2..fb7a2f3bf99c298288fb9e148fc0c00cf8cf4415 100644 (file)
@@ -34,6 +34,7 @@ sub input_maildir_cb {
 
 sub do_convert { # via wq_do
        my ($self) = @_;
+       $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
        for my $input (@{$self->{inputs}}) {
                $self->input_path_url($input);
        }
index c04fc2f84d1566bc1a609fe5b771d27ca3fb8761..505b73ff6df0a8c27ef8c12ed581252e90f93b6d 100644 (file)
@@ -75,6 +75,13 @@ sub input_path_url {
                my $m = $lei->{opt}->{'lock'} //
                        PublicInbox::MboxLock->defaults;
                my $mbl = PublicInbox::MboxLock->acq($input, 0, $m);
+               my $zsfx = PublicInbox::MboxReader::zsfx($input);
+               if ($zsfx) {
+                       my $in = delete $mbl->{fh};
+                       $mbl->{fh} =
+                            PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei);
+               }
+               local $PublicInbox::DS::in_loop = 0 if $zsfx; # dwaitpid
                $self->input_fh($ifmt, $mbl->{fh}, $input, @args);
        } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
                return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
index 88468c341898829cb57b68070d8d8e98450761fe..847e5cb9700f6fe0fd37a34199dbb20e74151b4a 100644 (file)
@@ -9,7 +9,7 @@ use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::Lock;
 use PublicInbox::ProcessPipe;
-use PublicInbox::Spawn qw(which spawn popen_rd);
+use PublicInbox::Spawn qw(spawn);
 use PublicInbox::LeiDedupe;
 use PublicInbox::Git;
 use PublicInbox::GitAsyncCat;
@@ -167,56 +167,10 @@ sub reap_compress { # dwaitpid callback
        $lei->fail("@$cmd failed", $? >> 8);
 }
 
-# all of these support -c for stdout and -d for decompression,
-# mutt is commonly distributed with hooks for gz, bz2 and xz, at least
-# { foo => '' } means "--foo" is passed to the command-line,
-# otherwise { foo => '--bar' } passes "--bar"
-our %zsfx2cmd = (
-       gz => [ qw(GZIP pigz gzip), { rsyncable => '' } ],
-       bz2 => [ 'bzip2', {} ],
-       xz => [ 'xz', {} ],
-       # XXX does anybody care for these?  I prefer zstd on entire FSes,
-       # so it's probably not necessary on a per-file basis
-       # zst => [ 'zstd', { -default => [ qw(-q) ], # it's noisy by default
-       #       rsyncable => '', threads => '-T' } ],
-       # zz => [ 'pigz', { -default => [ '--zlib' ],
-       #       rsyncable => '', threads => '-p' }],
-       # lzo => [ 'lzop', {} ],
-       # lzma => [ 'lzma', {} ],
-);
-
-sub zsfx2cmd ($$$) {
-       my ($zsfx, $decompress, $lei) = @_;
-       my $x = $zsfx2cmd{$zsfx} // die "no support for suffix=.$zsfx";
-       my @info = @$x;
-       my $cmd_opt = pop @info;
-       my @cmd = (undef, $decompress ? qw(-dc) : qw(-c));
-       for my $exe (@info) {
-               # I think respecting client's ENV{GZIP} is OK, not sure
-               # about ENV overrides for other, less-common compressors
-               if ($exe eq uc($exe)) {
-                       $exe = $lei->{env}->{$exe} or next;
-               }
-               $cmd[0] = which($exe) and last;
-       }
-       $cmd[0] // die join(' or ', @info)." missing for .$zsfx";
-       # push @cmd, @{$cmd_opt->{-default}} if $cmd_opt->{-default};
-       for my $bool (qw(rsyncable)) {
-               my $switch = $cmd_opt->{rsyncable} // next;
-               push @cmd, '--'.($switch || $bool);
-       }
-       for my $key (qw(rsyncable)) { # support compression level?
-               my $switch = $cmd_opt->{$key} // next;
-               my $val = $lei->{opt}->{$key} // next;
-               push @cmd, $switch, $val;
-       }
-       \@cmd;
-}
-
 sub _post_augment_mbox { # open a compressor process
        my ($self, $lei) = @_;
        my $zsfx = $self->{zsfx} or return;
-       my $cmd = zsfx2cmd($zsfx, undef, $lei);
+       my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei);
        my ($r, $w) = @{delete $lei->{zpipe}};
        my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
        my $pid = spawn($cmd, undef, $rdr);
@@ -227,12 +181,6 @@ sub _post_augment_mbox { # open a compressor process
        $lei->{1} = $pp;
 }
 
-sub decompress_src ($$$) {
-       my ($in, $zsfx, $lei) = @_;
-       my $cmd = zsfx2cmd($zsfx, 1, $lei);
-       popen_rd($cmd, undef, { 0 => $in, 2 => $lei->{2} });
-}
-
 sub dup_src ($) {
        my ($in) = @_;
        open my $dup, '+>>&', $in or die "dup: $!";
@@ -533,8 +481,7 @@ sub _pre_augment_mbox {
                die "--augment specified but $dst is not seekable\n" if
                        $lei->{opt}->{augment};
        }
-       state $zsfx_allow = join('|', keys %zsfx2cmd);
-       if (($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/)) {
+       if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) {
                pipe(my ($r, $w)) or die "pipe: $!";
                $lei->{zpipe} = [ $r, $w ];
                $lei->{ovv}->{lock_path} and
@@ -559,7 +506,8 @@ sub _do_augment_mbox {
                return;
        }
        my $zsfx = $self->{zsfx};
-       my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : dup_src($out);
+       my $rd = $zsfx ? PublicInbox::MboxReader::zsfxcat($out, $zsfx, $lei)
+                       : dup_src($out);
        my $dedupe;
        if ($opt->{augment}) {
                $dedupe = $lei->{dedupe};
index 6302d1fa540dfada479eb12d4c884512b9137568..9291f00b67145b84c5de7d43dc7504e9be59d56b 100644 (file)
@@ -138,4 +138,56 @@ sub reads {
        $ifmt =~ /\Ambox(?:rd|cl|cl2|o)\z/ ? __PACKAGE__->can($ifmt) : undef
 }
 
+# all of these support -c for stdout and -d for decompression,
+# mutt is commonly distributed with hooks for gz, bz2 and xz, at least
+# { foo => '' } means "--foo" is passed to the command-line,
+# otherwise { foo => '--bar' } passes "--bar"
+my %zsfx2cmd = (
+       gz => [ qw(GZIP pigz gzip), { rsyncable => '' } ],
+       bz2 => [ 'bzip2', {} ],
+       xz => [ 'xz', {} ],
+       # don't add new entries here unless MUA support is widely available
+);
+
+sub zsfx ($) {
+       my ($pathname) = @_;
+       my $allow = join('|', keys %zsfx2cmd);
+       $pathname =~ /\.($allow)\z/ ? $1 : undef;
+}
+
+sub zsfx2cmd ($$$) {
+       my ($zsfx, $decompress, $lei) = @_;
+       my $x = $zsfx2cmd{$zsfx} // die "BUG: no support for suffix=.$zsfx";
+       my @info = @$x;
+       my $cmd_opt = pop @info;
+       my @cmd = (undef, $decompress ? qw(-dc) : qw(-c));
+       require PublicInbox::Spawn;
+       for my $exe (@info) {
+               # I think respecting client's ENV{GZIP} is OK, not sure
+               # about ENV overrides for other, less-common compressors
+               if ($exe eq uc($exe)) {
+                       $exe = $lei->{env}->{$exe} or next;
+               }
+               $cmd[0] = PublicInbox::Spawn::which($exe) and last;
+       }
+       $cmd[0] // die join(' or ', @info)." missing for .$zsfx";
+       # push @cmd, @{$cmd_opt->{-default}} if $cmd_opt->{-default};
+       for my $bool (qw(rsyncable)) {
+               my $switch = $cmd_opt->{rsyncable} // next;
+               push @cmd, '--'.($switch || $bool);
+       }
+       for my $key (qw(rsyncable)) { # support compression level?
+               my $switch = $cmd_opt->{$key} // next;
+               my $val = $lei->{opt}->{$key} // next;
+               push @cmd, $switch, $val;
+       }
+       \@cmd;
+}
+
+sub zsfxcat ($$$) {
+       my ($in, $zsfx, $lei) = @_;
+       my $cmd = zsfx2cmd($zsfx, 1, $lei);
+       PublicInbox::Spawn::popen_rd($cmd, undef, { 0 => $in, 2 => $lei->{2} });
+}
+
 1;
index 7ff628f9faa3a978084193462480b652752e16f6..dc53b82cb510a7c2f89dc03a3f61cd5efed9f1ad 100644 (file)
@@ -6,6 +6,7 @@ use PublicInbox::MboxReader;
 use PublicInbox::MdirReader;
 use PublicInbox::NetReader;
 use PublicInbox::Eml;
+use IO::Uncompress::Gunzip;
 require_mods(qw(lei -imapd -nntpd Mail::IMAPClient Net::NNTP));
 my ($tmpdir, $for_destroy) = tmpdir;
 my $sock = tcp_server;
@@ -91,13 +92,28 @@ test_lei({ tmpdir => $tmpdir }, sub {
        @bar = ();
        PublicInbox::MboxReader->mboxcl2($fh, sub {
                my $eml = shift;
-               for my $h (qw(Status Content-Length Lines)) {
+               for my $h (qw(Content-Length Lines)) {
                        ok(defined($eml->header_raw($h)),
                                "$h defined for mboxcl2");
                        $eml->header_set($h);
                }
                push @bar, $eml;
        });
-       is_deeply(\@bar, [ eml_load('t/plack-qp.eml') ], 'eml => mboxcl2');
+       my $qp_eml = eml_load('t/plack-qp.eml');
+       $qp_eml->header_set('Status', 'O');
+       is_deeply(\@bar, [ $qp_eml ], 'eml => mboxcl2');
+
+       lei_ok qw(convert t/plack-qp.eml -o), "mboxrd:$d/qp.gz";
+       open $fh, '<', "$d/qp.gz" or xbail $!;
+       ok(-s $fh, 'not empty');
+       $fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1);
+       @bar = ();
+       PublicInbox::MboxReader->mboxrd($fh, sub { push @bar, shift });
+       is_deeply(\@bar, [ $qp_eml ], 'wrote gzipped mboxrd');
+       lei_ok qw(convert -o mboxrd:/dev/stdout), "mboxrd:$d/qp.gz";
+       open $fh, '<', \$lei_out or xbail;
+       @bar = ();
+       PublicInbox::MboxReader->mboxrd($fh, sub { push @bar, shift });
+       is_deeply(\@bar, [ $qp_eml ], 'readed gzipped mboxrd');
 });
 done_testing;
index 626bdab3f8f62fd65c065fa5895d601479f4689e..75314add027d33ebe8e1ced8e87a46d60c8ae71a 100644 (file)
@@ -149,8 +149,8 @@ test_lei(sub {
        is($res->[1], undef, 'only one result');
 });
 
-for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
-       my $zsfx2cmd = PublicInbox::LeiToMail->can('zsfx2cmd');
+my $zsfx2cmd = PublicInbox::MboxReader->can('zsfx2cmd');
+for my $zsfx (qw(gz bz2 xz)) {
        SKIP: {
                my $cmd = eval { $zsfx2cmd->($zsfx, 0, $lei) };
                skip $@, 3 if $@;