No point in sending a command for every input when a
single one will do. We'll also trigger LeiStore->done
sooner in the worker rather than later.
use v5.10.1;
use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
use PublicInbox::LeiOverview;
use v5.10.1;
use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
use PublicInbox::LeiOverview;
# /^input_/ subs are used by PublicInbox::LeiInput
# /^input_/ subs are used by PublicInbox::LeiInput
$self->{wcb}->(undef, { kw => $kw }, $eml);
}
$self->{wcb}->(undef, { kw => $kw }, $eml);
}
-sub do_convert { # via wq_do
+sub process_inputs { # via wq_do
- $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
- for my $input (@{$self->{inputs}}) {
- $self->input_path_url($input);
- }
+ local $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
+ $self->SUPER::process_inputs;
delete $self->{lei}->{1};
delete $self->{wcb}; # commit
}
delete $self->{lei}->{1};
delete $self->{wcb}; # commit
}
$self->prepare_inputs($lei, \@inputs) or return;
my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
$lei->{cnv} = $self;
$self->prepare_inputs($lei, \@inputs) or return;
my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
$lei->{cnv} = $self;
- $self->wq_io_do('do_convert', []);
+ $self->wq_io_do('process_inputs', []);
$self->wq_close(1);
$op_c->op_wait_event($ops);
}
$self->wq_close(1);
$op_c->op_wait_event($ops);
}
my ($arg, $pid) = @_;
my ($imp, $lei) = @$arg;
$lei->child_error($?, 'non-fatal errors during import') if $?;
my ($arg, $pid) = @_;
my ($imp, $lei) = @$arg;
$lei->child_error($?, 'non-fatal errors during import') if $?;
- my $sto = delete $lei->{sto} // return $lei->fail('BUG: {sto} gone');
- my $wait = $sto->ipc_do('done'); # PublicInbox::LeiStore::done
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
- for my $input (@{$self->{inputs}}) {
- $self->wq_io_do('input_path_url', [], $input);
- }
+ $self->wq_io_do('process_inputs');
package PublicInbox::LeiInput;
use strict;
use v5.10.1;
package PublicInbox::LeiInput;
use strict;
use v5.10.1;
sub check_input_format ($;$) {
my ($lei, $files) = @_;
sub check_input_format ($;$) {
my ($lei, $files) = @_;
$self->{inputs} = $inputs;
}
$self->{inputs} = $inputs;
}
+sub process_inputs {
+ my ($self) = @_;
+ for my $input (@{$self->{inputs}}) {
+ $self->input_path_url($input);
+ }
+ my $wait = $self->{lei}->{sto}->ipc_do('done') if $self->{lei}->{sto};
+}
+
sub input_only_atfork_child {
my ($self) = @_;
my $lei = $self->{lei};
sub input_only_atfork_child {
my ($self) = @_;
my $lei = $self->{lei};
my ($arg, $pid) = @_;
my ($tag, $lei) = @$arg;
$lei->child_error($?, 'non-fatal errors during tag') if $?;
my ($arg, $pid) = @_;
my ($tag, $lei) = @$arg;
$lei->child_error($?, 'non-fatal errors during tag') if $?;
- my $sto = delete $lei->{sto};
- my $wait = $sto->ipc_do('done') if $sto; # PublicInbox::LeiStore::done
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
- for my $input (@{$self->{inputs}}) {
- $self->wq_io_do('input_path_url', [], $input);
- }
+ $self->wq_io_do('process_inputs');