]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei tag: parallelize Maildir access
authorEric Wong <e@80x24.org>
Wed, 9 Jun 2021 07:47:49 +0000 (07:47 +0000)
committerEric Wong <e@80x24.org>
Wed, 9 Jun 2021 10:03:44 +0000 (10:03 +0000)
Since Maildir isn't guaranteed to have any sort of order, we
can parallelize inputs, here.  On a 4-core system, this reduced
one of my tag invocations from 5.5 to 1.4s.

lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiPmdir.pm
lib/PublicInbox/LeiTag.pm

index cddd56192fad72700115579d3b1cfced28a33818..e3cb69ca93d44bf878d573e19b81fc5b833c1941 100644 (file)
@@ -35,13 +35,13 @@ sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
                die "BUG: $f was not from a Maildir?\n";
        my $kw = PublicInbox::MdirReader::flags2kw($fl);
        substr($folder, 0, 0) = 'maildir:'; # add prefix
-       my $lms = $self->{-lms_ro};
+       my $lse = $self->{lse} //= $self->{lei}->{sto}->search;
+       my $lms = $self->{-lms_ro} //= $lse->lms; # may be 0 or undef
        my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
-       my @docids = defined($oidbin) ?
-                       $self->{over}->oidbin_exists($oidbin) : ();
+       my @docids = defined($oidbin) ? $lse->over->oidbin_exists($oidbin) : ();
        my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
        if (scalar @docids) {
-               $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+               $lse->kw_changed(undef, $kw, \@docids) or return;
        }
        if (my $eml = eml_from_path($f)) {
                $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
index aa9ce713507a2448a1e50ced1ccd544669f8d756..760f276cffe41bf45566de6df3f377bcfd8332d9 100644 (file)
@@ -30,15 +30,9 @@ sub new {
 
 sub ipc_atfork_child {
        my ($self) = @_;
-       my $lei = $self->{lei};
-       $lei->_lei_atfork_child;
        my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
-       $ipt->{lei} = $lei;
-       $ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
-       $ipt->{lse} = $ipt->{sto}->search;
-       $ipt->{over} = $ipt->{lse}->over;
-       $ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0'
-       $self->SUPER::ipc_atfork_child;
+       $ipt->{lei} = $self->{lei};
+       $ipt->ipc_atfork_child;
 }
 
 sub each_mdir_fn { # maildir_each_file callback
index 4b3ce7d87061c717c2647e2995debc55f76441ee..e0532653e69773f52f9ef3d4f5a7c18ffc22ac02 100644 (file)
@@ -6,6 +6,7 @@ package PublicInbox::LeiTag;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
        my ($self, $eml) = @_;
@@ -24,8 +25,9 @@ sub input_mbox_cb {
        input_eml_cb($self, $eml);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-       my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+       my ($self, $f) = @_;
+       my $eml = eml_from_path($f) or return;
        input_eml_cb($self, $eml);
 }
 
@@ -42,12 +44,12 @@ sub lei_tag { # the "lei tag" method
        $lei->ale; # refresh and prepare
        my $vmd_mod = $self->vmd_mod_extract(\@argv);
        return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
+       $self->{vmd_mod} = $vmd_mod; # before LeiPmdir->new in prepare_inputs
        $self->prepare_inputs($lei, \@argv) or return;
        grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
                return $lei->fail('no keywords or labels specified');
        my $ops = {};
        $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
-       $self->{vmd_mod} = $vmd_mod;
        my $j = $self->{-wq_nr_workers} = 1; # locked for now
        (my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
        $lei->{wq1} = $self;