]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiTag.pm
lei tag: parallelize Maildir access
[public-inbox.git] / lib / PublicInbox / LeiTag.pm
index 2170e3f2e7de196723715f7f322fdab75aaa4d13..e0532653e69773f52f9ef3d4f5a7c18ffc22ac02 100644 (file)
@@ -6,10 +6,12 @@ package PublicInbox::LeiTag;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
        my ($self, $eml) = @_;
-       if (my $xoids = $self->{lei}->{ale}->xoids_for($eml)) {
+       if (my $xoids = $self->{lse}->xoids_for($eml) // # tries LeiMailSync
+                       $self->{lei}->{ale}->xoids_for($eml)) {
                $self->{lei}->{sto}->ipc_do('update_xvmd', $xoids, $eml,
                                                $self->{vmd_mod});
        } else {
@@ -17,16 +19,15 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
        }
 }
 
-sub input_mbox_cb { input_eml_cb($_[1], $_[0]) }
-
-sub net_merge_all_done { # callback used by LeiAuth
-       my ($self) = @_;
-       $self->wq_io_do('process_inputs');
-       $self->wq_close(1);
+sub input_mbox_cb {
+       my ($eml, $self) = @_;
+       $eml->header_set($_) for (qw(X-Status Status));
+       input_eml_cb($self, $eml);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-       my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+       my ($self, $f) = @_;
+       my $eml = eml_from_path($f) or return;
        input_eml_cb($self, $eml);
 }
 
@@ -43,18 +44,18 @@ sub lei_tag { # the "lei tag" method
        $lei->ale; # refresh and prepare
        my $vmd_mod = $self->vmd_mod_extract(\@argv);
        return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
+       $self->{vmd_mod} = $vmd_mod; # before LeiPmdir->new in prepare_inputs
        $self->prepare_inputs($lei, \@argv) or return;
        grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
                return $lei->fail('no keywords or labels specified');
        my $ops = {};
        $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
-       $self->{vmd_mod} = $vmd_mod;
        my $j = $self->{-wq_nr_workers} = 1; # locked for now
-       (my $op_c, $ops) = $lei->workers_start($self, 'lei-tag', $j, $ops);
+       (my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
        $lei->{wq1} = $self;
        $lei->{-err_type} = 'non-fatal';
        net_merge_all_done($self) unless $lei->{auth};
-       $op_c->op_wait_event($ops);
+       $lei->wait_wq_events($op_c, $ops);
 }
 
 sub note_missing {
@@ -66,6 +67,7 @@ sub note_missing {
 sub ipc_atfork_child {
        my ($self) = @_;
        PublicInbox::LeiInput::input_only_atfork_child($self);
+       $self->{lse} = $self->{lei}->{sto}->search;
        # this goes out-of-scope at worker process exit:
        PublicInbox::OnDestroy->new($$, \&note_missing, $self);
 }
@@ -117,5 +119,6 @@ sub _complete_tag {
 
 no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
 *net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
+*net_merge_all_done = \&PublicInbox::LeiInput::input_only_net_merge_all_done;
 
 1;