]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: maildir: move shard support to MdirReader
authorEric Wong <e@80x24.org>
Mon, 5 Apr 2021 10:27:50 +0000 (10:27 +0000)
committerEric Wong <e@80x24.org>
Mon, 5 Apr 2021 19:10:14 +0000 (19:10 +0000)
We'll eventually want lei_input users like "lei import" and
"lei tag" to support parallel reads.

lib/PublicInbox/InboxWritable.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/MdirReader.pm
t/lei-convert.t
t/lei_to_mail.t

index eeebc485156025f1aa1ff38f4b3ba1ebfde31a25..45d8cdc757356a0d9656abbf88f18c4624e6bc51 100644 (file)
@@ -154,8 +154,8 @@ sub import_maildir {
        my $im = $self->importer(1);
        my @self = $self->filter($im) ? ($self) : ();
        require PublicInbox::MdirReader;
-       PublicInbox::MdirReader::maildir_each_file(\&_each_maildir_fn,
-                                               $im, @self);
+       PublicInbox::MdirReader->new->maildir_each_file(\&_each_maildir_fn,
+                                                       $im, @self);
        $im->done;
 }
 
index 40d71f9eb6a8fe60d87c80f9a7aebda0d15e95d2..e416d3ed6e1777b428e3db9039c689fffc7ad7ac 100644 (file)
@@ -88,7 +88,7 @@ sub input_path_url {
                return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
 $input appears to a be a maildir, not $ifmt
 EOM
-               PublicInbox::MdirReader::maildir_each_eml($input,
+               PublicInbox::MdirReader->new->maildir_each_eml($input,
                                        $self->can('input_maildir_cb'),
                                        $self, @args);
        } else {
index 76a11b0efcf5d1009eda9e883b3acd51a09f777a..2e736070ed7e23880dd7b2c38112e270174ce5f1 100644 (file)
@@ -14,7 +14,6 @@ use PublicInbox::PktOp qw(pkt_do);
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
-use Digest::SHA qw(sha256_hex);
 
 my %kw2char = ( # Maildir characters
        draft => 'D',
@@ -234,17 +233,9 @@ sub update_kw_maybe ($$$$) {
        }
 }
 
-sub _augment_or_unlink { # maildir_each_eml cb
-       my ($f, $kw, $eml, $lei, $lse, $mod, $shard, $unlink) = @_;
-       if ($mod) {
-               # can't get dirent.d_ino w/ pure Perl readdir, so we extract
-               # the OID if it looks like one instead of doing stat(2)
-               my $hex = $f =~ m!\b([a-f0-9]{40,})[^/]*\z! ?
-                               $1 : sha256_hex($f);
-               my $recno = hex(substr($hex, 0, 8));
-               return if ($recno % $mod) != $shard;
-               update_kw_maybe($lei, $lse, $eml, $kw);
-       }
+sub _md_update { # maildir_each_eml cb
+       my ($f, $kw, $eml, $lei, $lse, $unlink) = @_;
+       update_kw_maybe($lei, $lse, $eml, $kw);
        $unlink ? unlink($f) : _augment($eml, $lei);
 }
 
@@ -392,21 +383,19 @@ sub _do_augment_maildir {
        my ($self, $lei) = @_;
        my $dst = $lei->{ovv}->{dst};
        my $lse = $lei->{opt}->{'import-before'} ? $lei->{lse} : undef;
-       my ($mod, $shard) = @{$self->{shard_info} // []};
+       my $mdr = PublicInbox::MdirReader->new;
        if ($lei->{opt}->{augment}) {
                my $dedupe = $lei->{dedupe};
                if ($dedupe && $dedupe->prepare_dedupe) {
-                       PublicInbox::MdirReader::maildir_each_eml($dst,
-                                               \&_augment_or_unlink,
-                                               $lei, $lse, $mod, $shard);
+                       $mdr->{shard_info} = $self->{shard_info};
+                       $mdr->maildir_each_eml($dst, \&_md_update, $lei, $lse);
                        $dedupe->pause_dedupe;
                }
        } elsif ($lse) {
-               PublicInbox::MdirReader::maildir_each_eml($dst,
-                                       \&_augment_or_unlink,
-                                       $lei, $lse, $mod, $shard, 1);
+               $mdr->{shard_info} = $self->{shard_info};
+               $mdr->maildir_each_eml($dst, \&_md_update, $lei, $lse, 1);
        } else {# clobber existing Maildir
-               PublicInbox::MdirReader::maildir_each_file($dst, \&_unlink);
+               $mdr->maildir_each_file($dst, \&_unlink);
        }
 }
 
index 1685e4d8ee2883f283ef606a478957fd85dc1ab2..b49c8ceb86eec87d1f56da3e20b5fe3f858d34c2 100644 (file)
@@ -8,6 +8,7 @@ package PublicInbox::MdirReader;
 use strict;
 use v5.10.1;
 use PublicInbox::InboxWritable qw(eml_from_path);
+use Digest::SHA qw(sha256_hex);
 
 # returns Maildir flags from a basename ('' for no flags, undef for invalid)
 sub maildir_basename_flags {
@@ -24,14 +25,25 @@ sub maildir_path_flags {
        $i >= 0 ? maildir_basename_flags(substr($f, $i + 1)) : undef;
 }
 
-sub maildir_each_file ($$;@) {
-       my ($dir, $cb, @arg) = @_;
+sub shard_ok ($$$) {
+       my ($bn, $mod, $shard) = @_;
+       # can't get dirent.d_ino w/ pure Perl readdir, so we extract
+       # the OID if it looks like one instead of doing stat(2)
+       my $hex = $bn =~ m!\A([a-f0-9]{40,})! ? $1 : sha256_hex($bn);
+       my $recno = hex(substr($hex, 0, 8));
+       ($recno % $mod) == $shard;
+}
+
+sub maildir_each_file {
+       my ($self, $dir, $cb, @arg) = @_;
        $dir .= '/' unless substr($dir, -1) eq '/';
+       my ($mod, $shard) = @{$self->{shard_info} // []};
        for my $d (qw(new/ cur/)) {
                my $pfx = $dir.$d;
                opendir my $dh, $pfx or next;
                while (defined(my $bn = readdir($dh))) {
                        maildir_basename_flags($bn) // next;
+                       next if defined($mod) && !shard_ok($bn, $mod, $shard);
                        $cb->($pfx.$bn, @arg);
                }
        }
@@ -40,15 +52,17 @@ sub maildir_each_file ($$;@) {
 my %c2kw = ('D' => 'draft', F => 'flagged', P => 'forwarded',
        R => 'answered', S => 'seen');
 
-sub maildir_each_eml ($$;@) {
-       my ($dir, $cb, @arg) = @_;
+sub maildir_each_eml {
+       my ($self, $dir, $cb, @arg) = @_;
        $dir .= '/' unless substr($dir, -1) eq '/';
+       my ($mod, $shard) = @{$self->{shard_info} // []};
        my $pfx = $dir . 'new/';
        if (opendir(my $dh, $pfx)) {
                while (defined(my $bn = readdir($dh))) {
                        next if substr($bn, 0, 1) eq '.';
                        my @f = split(/:/, $bn, -1);
                        next if scalar(@f) != 1;
+                       next if defined($mod) && !shard_ok($bn, $mod, $shard);
                        my $f = $pfx.$bn;
                        my $eml = eml_from_path($f) or next;
                        $cb->($f, [], $eml, @arg);
@@ -59,6 +73,7 @@ sub maildir_each_eml ($$;@) {
        while (defined(my $bn = readdir($dh))) {
                my $fl = maildir_basename_flags($bn) // next;
                next if index($fl, 'T') >= 0;
+               next if defined($mod) && !shard_ok($bn, $mod, $shard);
                my $f = $pfx.$bn;
                my $eml = eml_from_path($f) or next;
                my @kw = sort(map { $c2kw{$_} // () } split(//, $fl));
@@ -66,4 +81,6 @@ sub maildir_each_eml ($$;@) {
        }
 }
 
+sub new { bless {}, __PACKAGE__ }
+
 1;
index dc53b82cb510a7c2f89dc03a3f61cd5efed9f1ad..0ea860c8218939d8fe22d4887ca6730dc8f17f84 100644 (file)
@@ -57,7 +57,7 @@ test_lei({ tmpdir => $tmpdir }, sub {
        lei_ok('convert', '-o', "$d/md", "mboxrd:$d/foo.mboxrd");
        ok(-d "$d/md", 'Maildir created');
        my @md;
-       PublicInbox::MdirReader::maildir_each_eml("$d/md", sub {
+       PublicInbox::MdirReader->new->maildir_each_eml("$d/md", sub {
                push @md, $_[2];
        });
        is(scalar(@md), scalar(@mboxrd), 'got expected emails in Maildir') or
index 75314add027d33ebe8e1ced8e87a46d60c8ae71a..513572579274fef5f49e7e1cfe14b687f28023b6 100644 (file)
@@ -253,7 +253,7 @@ SKIP: { # FIFO support
 }
 
 { # Maildir support
-       my $each_file = PublicInbox::MdirReader->can('maildir_each_file');
+       my $mdr = PublicInbox::MdirReader->new;
        my $md = "$tmpdir/maildir/";
        my $wcb = $wcb_get->('maildir', $md);
        is(ref($wcb), 'CODE', 'got Maildir callback');
@@ -261,7 +261,7 @@ SKIP: { # FIFO support
        $wcb->(\(my $x = $buf), $b4dc0ffee);
 
        my @f;
-       $each_file->($md, sub { push @f, shift });
+       $mdr->maildir_each_file($md, sub { push @f, shift });
        open my $fh, $f[0] or BAIL_OUT $!;
        is(do { local $/; <$fh> }, $buf, 'wrote to Maildir');
 
@@ -270,7 +270,7 @@ SKIP: { # FIFO support
        $wcb->(\($x = $buf."\nx\n"), $deadcafe);
 
        my @x = ();
-       $each_file->($md, sub { push @x, shift });
+       $mdr->maildir_each_file($md, sub { push @x, shift });
        is(scalar(@x), 1, 'wrote one new file');
        ok(!-f $f[0], 'old file clobbered');
        open $fh, $x[0] or BAIL_OUT $!;
@@ -281,7 +281,7 @@ SKIP: { # FIFO support
        $wcb->(\($x = $buf."\ny\n"), $deadcafe);
        $wcb->(\($x = $buf."\ny\n"), $b4dc0ffee); # skipped by dedupe
        @f = ();
-       $each_file->($md, sub { push @f, shift });
+       $mdr->maildir_each_file($md, sub { push @f, shift });
        is(scalar grep(/\A\Q$x[0]\E\z/, @f), 1, 'old file still there');
        my @new = grep(!/\A\Q$x[0]\E\z/, @f);
        is(scalar @new, 1, '1 new file written (b4dc0ffee skipped)');