'leistore.dir' => 'top-level storage location',
);
-my @WQ_KEYS = qw(lxs l2m imp mrr cnv p2q tag sol lsss); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1); # internal workers
sub _drop_wq {
my ($self) = @_;
'child_error' => [ \&child_error, $lei ],
($ops ? %$ops : ()),
};
- $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&dclose, $lei ];
+ $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
my $end = $lei->pkt_op_pair;
$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
delete $lei->{pkt_op_p};
sub wq_done_wait { # dwaitpid callback
my ($arg, $pid) = @_;
- my ($wq, $lei, $e) = @$arg;
- $? and $lei->child_error($?, $e ? "$e errors during $lei->{cmd}" : ());
+ my ($wq, $lei) = @$arg;
+ my $err_type = $lei->{-err_type};
+ $? and $lei->child_error($?,
+ $err_type ? "$err_type errors during $lei->{cmd}" : ());
$lei->dclose;
}
+sub wq_eof { # EOF callback for main daemon
+ my ($lei) = @_;
+ my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
+ $wq1->wq_wait_old(\&wq_done_wait, $lei);
+}
+
1;
use PublicInbox::Spawn qw(spawn popen_rd which);
use PublicInbox::DS;
-sub _lei_wq_eof { # EOF callback for main daemon
- my ($lei) = @_;
- my $sol = delete $lei->{sol} // return $lei->dclose; # already failed
- $sol->wq_wait_old($lei->can('wq_done_wait'), $lei);
-}
-
sub get_git_dir ($$) {
my ($lei, $d) = @_;
return $d if -d "$d/objects" && -d "$d/refs" && -e "$d/HEAD";
require PublicInbox::SolverGit;
my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
my ($op_c, $ops) = $lei->workers_start($self, 'lei-blob', 1);
- $lei->{sol} = $self;
+ $lei->{wq1} = $self;
$self->wq_io_do('do_solve_blob', []);
$self->wq_close(1);
$op_c->op_wait_event($ops);
$lei->{opt}->{augment} = 1 if $devfd < 0;
$self->prepare_inputs($lei, \@inputs) or return;
my ($op_c, $ops) = $lei->workers_start($self, 'lei-convert', 1);
- $lei->{cnv} = $self;
+ $lei->{wq1} = $self;
$self->wq_io_do('process_inputs', []);
$self->wq_close(1);
$op_c->op_wait_event($ops);
input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef);
}
-sub _lei_wq_eof { # EOF callback for main daemon
- my ($lei) = @_;
- my $imp = delete $lei->{imp} // return $lei->fail('BUG: {imp} gone');
- $imp->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
-}
-
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
$self->wq_io_do('process_inputs');
$self->{-wq_nr_workers} = $j // 1; # locked
$lei->{-eml_noisy} = 1;
(my $op_c, $ops) = $lei->workers_start($self, 'lei-import', $j, $ops);
- $lei->{imp} = $self;
+ $lei->{wq1} = $self;
+ $lei->{-err_type} = 'non-fatal';
net_merge_complete($self) unless $lei->{auth};
$op_c->op_wait_event($ops);
}
my ($lei, $pfx, $json) = @_;
my $self = bless { -wq_nr_workers => 1, json => $json }, __PACKAGE__;
my ($op_c, $ops) = $lei->workers_start($self, 'ls-search', 1);
- $lei->{lsss} = $self;
+ $lei->{wq1} = $self;
$self->wq_io_do('do_ls_search_long', [], $pfx);
$self->wq_close(1);
$op_c->op_wait_event($ops);
sub _lei_wq_eof { # EOF callback for main daemon
my ($lei) = @_;
- my $mrr = delete $lei->{mrr} or return;
+ my $mrr = delete $lei->{wq1} or return $lei->fail;
$mrr->wq_wait_old(\&do_finish_mirror, $lei);
}
require PublicInbox::Admin;
require PublicInbox::InboxWritable;
my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1);
- $lei->{mrr} = $self;
+ $lei->{wq1} = $self;
$self->wq_io_do('do_mirror', []);
$self->wq_close(1);
$op->op_wait_event($ops);
$self->{input} = $input;
}
my ($op, $ops) = $lei->workers_start($self, 'lei-p2q', 1);
- $lei->{p2q} = $self;
+ $lei->{wq1} = $self;
$self->wq_io_do('do_p2q', []);
$self->wq_close(1);
$op->op_wait_event($ops);
$self->SUPER::ipc_atfork_child;
}
-sub _lei_wq_eof { # EOF callback for main daemon
- my ($lei) = @_;
- my $p2q = delete $lei->{p2q} // return $lei->dclose;
- $p2q->wq_wait_old($lei->can('wq_done_wait'), $lei);
-}
-
1;
sub input_mbox_cb { input_eml_cb($_[1], $_[0]) }
-sub _lei_wq_eof { # EOF callback for main daemon
- my ($lei) = @_;
- my $tag = delete $lei->{tag} // return $lei->dclose;
- $tag->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
-}
-
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
$self->wq_io_do('process_inputs');
$self->{vmd_mod} = $vmd_mod;
my $j = $self->{-wq_nr_workers} = 1; # locked for now
(my $op_c, $ops) = $lei->workers_start($self, 'lei-tag', $j, $ops);
- $lei->{tag} = $self;
+ $lei->{wq1} = $self;
+ $lei->{-err_type} = 'non-fatal';
net_merge_complete($self) unless $lei->{auth};
$op_c->op_wait_event($ops);
}
} elsif ($buf eq '-WINCH') {
kill($buf, @parent); # for MUA
} elsif ($buf =~ /\Ax_it ([0-9]+)\z/) {
- $x_it_code = $1 + 0;
+ $x_it_code ||= $1 + 0;
last;
} elsif ($buf =~ /\Achild_error ([0-9]+)\z/) {
- $x_it_code = $1 + 0;
+ $x_it_code ||= $1 + 0;
} else {
$sigchld->();
die $buf;