lib/PublicInbox/LeiConvert.pm | 11 +++++------ lib/PublicInbox/LeiImport.pm | 6 +----- lib/PublicInbox/LeiInput.pm | 9 +++++++++ lib/PublicInbox/LeiTag.pm | 6 +----- diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index fb7a2f3bf99c298288fb9e148fc0c00cf8cf4415..0ce49ea9a866866ea9ea72a3db3d4b70dfdf52ce 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -7,6 +7,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::IPC PublicInbox::LeiInput); use PublicInbox::LeiOverview; +use PublicInbox::DS; # /^input_/ subs are used by PublicInbox::LeiInput @@ -32,12 +33,10 @@ my (undef, $kw, $eml, $self) = @_; # $_[0] $filename ignored $self->{wcb}->(undef, { kw => $kw }, $eml); } -sub do_convert { # via wq_do +sub process_inputs { # via wq_do my ($self) = @_; - $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid - for my $input (@{$self->{inputs}}) { - $self->input_path_url($input); - } + local $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid + $self->SUPER::process_inputs; delete $self->{lei}->{1}; delete $self->{wcb}; # commit } @@ -55,7 +54,7 @@ $lei->{opt}->{augment} = 1 if $devfd < 0; $self->prepare_inputs($lei, \@inputs) or return; my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1); $lei->{cnv} = $self; - $self->wq_io_do('do_convert', []); + $self->wq_io_do('process_inputs', []); $self->wq_close(1); $op_c->op_wait_event($ops); } diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index dbf655b6f3fc690f8e39f41de8b24740b2d6aded..d33143ef2c486f1da4050b9cb863c025d4160f60 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -39,8 +39,6 @@ sub import_done_wait { # dwaitpid callback my ($arg, $pid) = @_; my ($imp, $lei) = @$arg; $lei->child_error($?, 'non-fatal errors during import') if $?; - my $sto = delete $lei->{sto} // return $lei->fail('BUG: {sto} gone'); - my $wait = $sto->ipc_do('done'); # PublicInbox::LeiStore::done $lei->dclose; } @@ -52,9 +50,7 @@ } sub net_merge_complete { # callback used by LeiAuth my ($self) = @_; - for my $input (@{$self->{inputs}}) { - $self->wq_io_do('input_path_url', [], $input); - } + $self->wq_io_do('process_inputs'); $self->wq_close(1); } diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index 505b73ff6df0a8c27ef8c12ed581252e90f93b6d..40d71f9eb6a8fe60d87c80f9a7aebda0d15e95d2 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -5,6 +5,7 @@ # parent class for LeiImport, LeiConvert package PublicInbox::LeiInput; use strict; use v5.10.1; +use PublicInbox::DS; sub check_input_format ($;$) { my ($lei, $files) = @_; @@ -163,6 +164,14 @@ $lei->{auth} //= PublicInbox::LeiAuth->new; $lei->{net} //= $net; } $self->{inputs} = $inputs; +} + +sub process_inputs { + my ($self) = @_; + for my $input (@{$self->{inputs}}) { + $self->input_path_url($input); + } + my $wait = $self->{lei}->{sto}->ipc_do('done') if $self->{lei}->{sto}; } sub input_only_atfork_child { diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index 8b012b160eaf5778ffff42115b3e445ebeef60aa..d572a84acab43e84ece12d90ff32e9fd82e5692b 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -73,8 +73,6 @@ sub tag_done_wait { # dwaitpid callback my ($arg, $pid) = @_; my ($tag, $lei) = @$arg; $lei->child_error($?, 'non-fatal errors during tag') if $?; - my $sto = delete $lei->{sto}; - my $wait = $sto->ipc_do('done') if $sto; # PublicInbox::LeiStore::done $lei->dclose; } @@ -86,9 +84,7 @@ } sub net_merge_complete { # callback used by LeiAuth my ($self) = @_; - for my $input (@{$self->{inputs}}) { - $self->wq_io_do('input_path_url', [], $input); - } + $self->wq_io_do('process_inputs'); $self->wq_close(1); }