]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei import: speed up repeated Maildir imports
authorEric Wong <e@80x24.org>
Tue, 8 Jun 2021 09:50:21 +0000 (09:50 +0000)
committerEric Wong <e@80x24.org>
Tue, 8 Jun 2021 16:50:47 +0000 (16:50 +0000)
On a 4-core CPU, this speeds up "lei import" on a largish
Maildir inbox with 75K messages from ~8 minutes down to ~40s.

Parallelizing alone did not bring any improvement and may
even hurt performance slightly, depending on CPU availability.
However, creating the index on the "fid" and "name" columns in
blob2name yields us the same speedup we got.

Parallelizing IMAP makes more sense due to the fact most IMAP
stores are non-local and subject to network latency.

Followup-to: bdecd7ed8e0dcf0b45491b947cd737ba8cfe38a3 ("lei import: speed up kw updates for old IMAP messages")
MANIFEST
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiIndex.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiMailSync.pm
lib/PublicInbox/LeiPmdir.pm [new file with mode: 0644]
lib/PublicInbox/MdirReader.pm
t/lei-import-maildir.t

index 5a70a144ebb97ee5433c01aa00877eb4b81cd487..7bdbf25200cdc0b894aeac35e7ab150d5e77d1ff 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -221,6 +221,7 @@ lib/PublicInbox/LeiMailSync.pm
 lib/PublicInbox/LeiMirror.pm
 lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiP2q.pm
+lib/PublicInbox/LeiPmdir.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiRediff.pm
 lib/PublicInbox/LeiRemote.pm
index ed01e8def7bf2754fd5130a04d900e111079a602..77fc5b8fa278983dea3a332fc3277894ac5ff7d7 100644 (file)
@@ -240,7 +240,7 @@ our %CMD = ( # sorted in order of importance/use:
         @c_opt ],
 'import' => [ 'LOCATION...|--stdin',
        'one-time import/update from URL or filesystem',
-       qw(stdin| offset=i recursive|r exclude=s include|I=s
+       qw(stdin| offset=i recursive|r exclude=s include|I=s jobs=s
        lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
        qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ],
 'forget-mail-sync' => [ 'LOCATION...',
@@ -421,7 +421,7 @@ my %CONFIG_KEYS = (
        'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers
 
 sub _drop_wq {
        my ($self) = @_;
@@ -566,7 +566,7 @@ sub pkt_op_pair {
 }
 
 sub workers_start {
-       my ($lei, $wq, $jobs, $ops) = @_;
+       my ($lei, $wq, $jobs, $ops, $flds) = @_;
        $ops = {
                '!' => [ \&fail_handler, $lei ],
                '|' => [ \&sigpipe_handler, $lei ],
@@ -577,7 +577,8 @@ sub workers_start {
        $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
        my $end = $lei->pkt_op_pair;
        my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
-       $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+       $flds->{lei} = $lei;
+       $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
        delete $lei->{pkt_op_p};
        my $op_c = delete $lei->{pkt_op_c};
        # {-lei_sock} persists script/lei process until ops->{''} EOF callback
@@ -590,7 +591,7 @@ sub workers_start {
 # call this when we're ready to wait on events and yield to other clients
 sub wait_wq_events {
        my ($lei, $op_c, $ops) = @_;
-       for my $wq (grep(defined, @$lei{qw(ikw)})) { # auxiliary WQs
+       for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
                $wq->wq_close(1);
        }
        $op_c->{ops} = $ops;
index 222f75c8abe1b0e5c9ebda30e68c9a3ca0f03a5e..b0e7ba6bc490ef9d870e4c55f36a98a06e5b4597 100644 (file)
@@ -6,6 +6,7 @@ package PublicInbox::LeiImport;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 # /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
 
@@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback
        input_eml_cb($self, $eml, $vmd);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-       my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+       my ($self, $f, @args) = @_;
+       my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+               die "BUG: $f was not from a Maildir?\n";
+       my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+       return if index($fl, 'T') >= 0; # no Trashed messages
+       my $kw = PublicInbox::MdirReader::flags2kw($fl);
+       substr($folder, 0, 0) = 'maildir:'; # add prefix
+       my $lms = $self->{-lms_ro};
+       my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+       my @docids = defined($oidbin) ?
+                       $self->{over}->oidbin_exists($oidbin) : ();
        my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-       if ($self->{-mail_sync}) {
-               if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
-                       $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
-               } else {
-                       warn "E: $f was not from a Maildir?\n";
-               }
+       if (scalar @docids) {
+               $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+       }
+       if (my $eml = eml_from_path($f)) {
+               $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
+               $self->input_eml_cb($eml, $vmd);
        }
-       $self->input_eml_cb($eml, $vmd);
 }
 
 sub input_net_cb { # imap_each / nntp_each
@@ -62,11 +72,13 @@ sub do_import_index ($$@) {
        my $vmd_mod = $self->vmd_mod_extract(\@inputs);
        return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
        $self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
-       $self->prepare_inputs($lei, \@inputs) or return;
+       $lei->ale; # initialize for workers to read (before LeiPmdir->new)
        $self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+       $self->prepare_inputs($lei, \@inputs) or return;
 
-       $lei->ale; # initialize for workers to read
-       my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+       my $j = $lei->{opt}->{jobs} // 0;
+       $j =~ /\A([0-9]+),[0-9]+\z/ and $j = $1 + 0;
+       $j ||= scalar(@{$self->{inputs}}) || 1;
        my $ikw;
        if (my $net = $lei->{net}) {
                # $j = $net->net_concurrency($j); TODO
index cc3e83e7e303b8d61c468298f9f0ff1bcaf67f5d..4be0c649b5cebe8df3d1a4fc336cd4364a9ebdd6 100644 (file)
@@ -35,7 +35,7 @@ sub lei_index {
 
 no warnings 'once';
 no strict 'refs';
-for my $m (qw(input_maildir_cb input_net_cb)) {
+for my $m (qw(pmdir_cb input_net_cb)) {
        *$m = PublicInbox::LeiImport->can($m);
 }
 
index 4ff7a379b3c3800c96d4648ea629146d5b1627fc..24211bf0cb0dff8a056cff8b84bb85438521ebfc 100644 (file)
@@ -151,9 +151,16 @@ sub input_path_url {
                return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
 $input appears to be a maildir, not $ifmt
 EOM
-               PublicInbox::MdirReader->new->maildir_each_eml($input,
-                                       $self->can('input_maildir_cb'),
-                                       $self, @args);
+               my $mdr = PublicInbox::MdirReader->new;
+               if (my $pmd = $self->{pmd}) {
+                       $mdr->maildir_each_file($input,
+                                               $pmd->can('each_mdir_fn'),
+                                               $pmd, @args);
+               } else {
+                       $mdr->maildir_each_eml($input,
+                                               $self->can('input_maildir_cb'),
+                                               $self, @args);
+               }
        } else {
                $lei->fail("$input unsupported (TODO)");
        }
@@ -215,7 +222,7 @@ sub prepare_inputs { # returns undef on error
                push @{$sync->{no}}, '/dev/stdin' if $sync;
        }
        my $net = $lei->{net}; # NetWriter may be created by l2m
-       my (@f, @d);
+       my (@f, @md);
        # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
        for my $input (@$inputs) {
                my $input_path = $input;
@@ -247,11 +254,11 @@ sub prepare_inputs { # returns undef on error
                                PublicInbox::MboxReader->reads($ifmt) or return
                                        $lei->fail("$ifmt not supported");
                        } elsif (-d $input_path) {
-                               require PublicInbox::MdirReader;
                                $ifmt eq 'maildir' or return
                                        $lei->fail("$ifmt not supported");
                                $sync and $input = 'maildir:'.
                                                $lei->abs_path($input_path);
+                               push @md, $input;
                        } else {
                                return $lei->fail("Unable to handle $input");
                        }
@@ -266,21 +273,18 @@ $input is `eml', not --in-format=$in_fmt
                        if ($devfd >= 0 || -f $input || -p _) {
                                push @{$sync->{no}}, $input if $sync;
                                push @f, $input;
-                       } elsif (-d $input) {
+                       } elsif (-d "$input/new" && -d "$input/cur") {
                                if ($sync) {
                                        $input = $lei->abs_path($input);
                                        push @{$sync->{ok}}, $input;
                                }
-                               push @d, $input;
+                               push @md, $input;
                        } else {
                                return $lei->fail("Unable to handle $input")
                        }
                }
        }
        if (@f) { check_input_format($lei, \@f) or return }
-       if (@d) { # TODO: check for MH vs Maildir, here
-               require PublicInbox::MdirReader;
-       }
        if ($sync && $sync->{no}) {
                return $lei->fail(<<"") if !$sync->{ok};
 --mail-sync specified but no inputs support it
@@ -299,6 +303,13 @@ $input is `eml', not --in-format=$in_fmt
                $lei->{auth} //= PublicInbox::LeiAuth->new;
                $lei->{net} //= $net;
        }
+       if (scalar(@md)) {
+               require PublicInbox::MdirReader;
+               if ($self->can('pmdir_cb')) {
+                       require PublicInbox::LeiPmdir;
+                       $self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self);
+               }
+       }
        $self->{inputs} = $inputs;
 }
 
index 75603d89e1788127487e83a5b396b882d60ba8c1..ec05404ad004fc9740e7aec781fd9ddc96efd539 100644 (file)
@@ -66,6 +66,10 @@ CREATE TABLE IF NOT EXISTS blob2name (
        UNIQUE (oidbin, fid, name)
 )
 
+       # speeds up LeiImport->pmdir_cb (for "lei import") by ~6x:
+       $dbh->do(<<'');
+CREATE INDEX IF NOT EXISTS idx_fid_name ON blob2name(fid,name)
+
 }
 
 sub fid_for {
@@ -375,6 +379,16 @@ EOM
        $sth->fetchrow_array;
 }
 
+sub name_oidbin ($$$) {
+       my ($self, $mdir, $nm) = @_;
+       my $fid = $self->{fmap}->{$mdir} //= fid_for($self, $mdir) // return;
+       my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
+SELECT oidbin FROM blob2name WHERE fid = ? AND name = ?
+EOM
+       $sth->execute($fid, $nm);
+       $sth->fetchrow_array;
+}
+
 sub imap_oid {
        my ($self, $lei, $uid_uri) = @_;
        my $mailbox_uri = $uid_uri->clone;
diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm
new file mode 100644 (file)
index 0000000..5efb012
--- /dev/null
@@ -0,0 +1,67 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with parallel Maildir reads;
+# this does NOT use the {shard_info} field of LeiToMail
+# (and we may remove {shard_info})
+# WQ key: {pmd}
+package PublicInbox::LeiPmdir;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+sub new {
+       my ($cls, $lei, $ipt) = @_;
+       my $self = bless { -wq_ident => 'lei Maildir worker' }, $cls;
+       my $jobs = $lei->{opt}->{jobs};
+       $jobs =~ /\A[0-9]+,([0-9]+)\z/ and $jobs = $1;
+       my $nproc = $jobs // do {
+               # untested with >=4 CPUs, though I suspect I/O latency
+               # of SATA SSD storage will make >=4 processes unnecessary,
+               # here.  NVMe users may wish to use '-j'
+               my $n = $self->detect_nproc;
+               $n = 4 if $n > 4;
+       };
+       my ($op_c, $ops) = $lei->workers_start($self, $nproc,
+               undef, { ipt => $ipt }); # LeiInput subclass
+       $op_c->{ops} = $ops; # for PktOp->event_step
+       $lei->{pmd} = $self;
+}
+
+sub ipc_atfork_child {
+       my ($self) = @_;
+       my $lei = $self->{lei};
+       $lei->_lei_atfork_child;
+       my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
+       $ipt->{lei} = $lei;
+       $ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+       $ipt->{lse} = $ipt->{sto}->search;
+       $ipt->{over} = $ipt->{lse}->over;
+       $ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0'
+       $self->SUPER::ipc_atfork_child;
+}
+
+sub each_mdir_fn { # maildir_each_file callback
+       my ($f, $self, @args) = @_;
+       $self->wq_io_do('mdir_iter', [], $f, @args);
+}
+
+sub mdir_iter { # via wq_io_do
+       my ($self, $f, @args) = @_;
+       $self->{ipt}->pmdir_cb($f, @args);
+}
+
+sub pmd_done_wait {
+       my ($arg, $pid) = @_;
+       my ($self, $lei) = @$arg;
+       my $wait = $lei->{sto}->ipc_do('done');
+       $lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+       my ($lei) = @_;
+       my $pmd = delete $lei->{pmd} or return $lei->fail;
+       $pmd->wq_wait_old(\&pmd_done_wait, $lei);
+}
+
+1;
index 304be63d630391c9c36c09c16c19523cb920722f..484bf0a8dcee682afc3fded71cb9108cb098ee6d 100644 (file)
@@ -87,17 +87,21 @@ sub maildir_each_eml {
 sub new { bless {}, __PACKAGE__ }
 
 sub flags2kw ($) {
-       my @unknown;
-       my %kw;
-       for (split(//, $_[0])) {
-               my $k = $c2kw{$_};
-               if (defined($k)) {
-                       $kw{$k} = 1;
-               } else {
-                       push @unknown, $_;
+       if (wantarray) {
+               my @unknown;
+               my %kw;
+               for (split(//, $_[0])) {
+                       my $k = $c2kw{$_};
+                       if (defined($k)) {
+                               $kw{$k} = 1;
+                       } else {
+                               push @unknown, $_;
+                       }
                }
+               (\%kw, \@unknown);
+       } else {
+               [ sort(map { $c2kw{$_} // () } split(//, $_[0])) ];
        }
-       (\%kw, \@unknown);
 }
 
 1;
index 688b10ce78a0e866ae84b0502712e5db6dd497a7..c81e7805fe7f47101b4e34ae7c514110840e3631 100644 (file)
@@ -28,7 +28,7 @@ test_lei(sub {
        is(scalar(keys %v), 1, 'inspect handles relative and absolute paths');
        my $inspect = json_utf8->decode([ keys %v ]->[0]);
        is_deeply($inspect, {"maildir:$md" => { 'name.count' => 1 }},
-               'inspect maildir: path had expected output');
+               'inspect maildir: path had expected output') or xbail($inspect);
 
        lei_ok(qw(q s:boolean));
        my $res = json_utf8->decode($lei_out);