]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei_input: reduce IPC traffic with multiple inputs
authorEric Wong <e@80x24.org>
Wed, 31 Mar 2021 07:45:51 +0000 (07:45 +0000)
committerEric Wong <e@80x24.org>
Wed, 31 Mar 2021 22:27:19 +0000 (22:27 +0000)
No point in sending a command for every input when a
single one will do.  We'll also trigger LeiStore->done
sooner in the worker rather than later.

lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiTag.pm

index fb7a2f3bf99c298288fb9e148fc0c00cf8cf4415..0ce49ea9a866866ea9ea72a3db3d4b70dfdf52ce 100644 (file)
@@ -7,6 +7,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
 use PublicInbox::LeiOverview;
+use PublicInbox::DS;
 
 # /^input_/ subs are used by PublicInbox::LeiInput
 
@@ -32,12 +33,10 @@ sub input_maildir_cb {
        $self->{wcb}->(undef, { kw => $kw }, $eml);
 }
 
-sub do_convert { # via wq_do
+sub process_inputs { # via wq_do
        my ($self) = @_;
-       $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
-       for my $input (@{$self->{inputs}}) {
-               $self->input_path_url($input);
-       }
+       local $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
+       $self->SUPER::process_inputs;
        delete $self->{lei}->{1};
        delete $self->{wcb}; # commit
 }
@@ -55,7 +54,7 @@ sub lei_convert { # the main "lei convert" method
        $self->prepare_inputs($lei, \@inputs) or return;
        my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
        $lei->{cnv} = $self;
-       $self->wq_io_do('do_convert', []);
+       $self->wq_io_do('process_inputs', []);
        $self->wq_close(1);
        $op_c->op_wait_event($ops);
 }
index dbf655b6f3fc690f8e39f41de8b24740b2d6aded..d33143ef2c486f1da4050b9cb863c025d4160f60 100644 (file)
@@ -39,8 +39,6 @@ sub import_done_wait { # dwaitpid callback
        my ($arg, $pid) = @_;
        my ($imp, $lei) = @$arg;
        $lei->child_error($?, 'non-fatal errors during import') if $?;
-       my $sto = delete $lei->{sto} // return $lei->fail('BUG: {sto} gone');
-       my $wait = $sto->ipc_do('done'); # PublicInbox::LeiStore::done
        $lei->dclose;
 }
 
@@ -52,9 +50,7 @@ sub import_done { # EOF callback for main daemon
 
 sub net_merge_complete { # callback used by LeiAuth
        my ($self) = @_;
-       for my $input (@{$self->{inputs}}) {
-               $self->wq_io_do('input_path_url', [], $input);
-       }
+       $self->wq_io_do('process_inputs');
        $self->wq_close(1);
 }
 
index 505b73ff6df0a8c27ef8c12ed581252e90f93b6d..40d71f9eb6a8fe60d87c80f9a7aebda0d15e95d2 100644 (file)
@@ -5,6 +5,7 @@
 package PublicInbox::LeiInput;
 use strict;
 use v5.10.1;
+use PublicInbox::DS;
 
 sub check_input_format ($;$) {
        my ($lei, $files) = @_;
@@ -165,6 +166,14 @@ $input is `eml', not --in-format=$in_fmt
        $self->{inputs} = $inputs;
 }
 
+sub process_inputs {
+       my ($self) = @_;
+       for my $input (@{$self->{inputs}}) {
+               $self->input_path_url($input);
+       }
+       my $wait = $self->{lei}->{sto}->ipc_do('done') if $self->{lei}->{sto};
+}
+
 sub input_only_atfork_child {
        my ($self) = @_;
        my $lei = $self->{lei};
index 8b012b160eaf5778ffff42115b3e445ebeef60aa..d572a84acab43e84ece12d90ff32e9fd82e5692b 100644 (file)
@@ -73,8 +73,6 @@ sub tag_done_wait { # dwaitpid callback
        my ($arg, $pid) = @_;
        my ($tag, $lei) = @$arg;
        $lei->child_error($?, 'non-fatal errors during tag') if $?;
-       my $sto = delete $lei->{sto};
-       my $wait = $sto->ipc_do('done') if $sto; # PublicInbox::LeiStore::done
        $lei->dclose;
 }
 
@@ -86,9 +84,7 @@ sub tag_done { # EOF callback for main daemon
 
 sub net_merge_complete { # callback used by LeiAuth
        my ($self) = @_;
-       for my $input (@{$self->{inputs}}) {
-               $self->wq_io_do('input_path_url', [], $input);
-       }
+       $self->wq_io_do('process_inputs');
        $self->wq_close(1);
 }