$end;
}
+sub incr {
+ my ($self, $field, $nr) = @_;
+ $self->{counters}->{$field} += $nr;
+}
+
sub workers_start {
my ($lei, $wq, $jobs, $ops, $flds) = @_;
$ops = {
'|' => [ \&sigpipe_handler, $lei ],
'x_it' => [ \&x_it, $lei ],
'child_error' => [ \&child_error, $lei ],
+ 'incr' => [ \&incr, $lei ],
($ops ? %$ops : ()),
};
$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
delete $lei->{pkt_op_p};
my $op_c = delete $lei->{pkt_op_c};
- # {-lei_sock} persists script/lei process until ops->{''} EOF callback
- $op_c->{-lei_sock} = $lei->{sock};
@$end = ();
$lei->event_step_init;
($op_c, $ops);
sub event_step_init {
my ($self) = @_;
- return if $self->{-event_init_done}++;
- if (my $sock = $self->{sock}) { # using DS->EventLoop
+ my $sock = $self->{sock} or return;
+ $self->{-event_init_done} //= do { # persist til $ops done
$self->SUPER::new($sock, EPOLLIN|EPOLLET);
- }
+ $sock;
+ };
}
sub noop {}
# can immediately reread it
sub DESTROY {
my ($self) = @_;
+ if (my $counters = delete $self->{counters}) {
+ for my $k (sort keys %$counters) {
+ my $nr = $counters->{$k};
+ $self->child_error(1 << 8, "$nr $k messages");
+ }
+ }
$self->{1}->autoflush(1) if $self->{1};
stop_pager($self);
# preserve $? for ->fail or ->x_it code