From be365fb4bd3eda10294d4a916321c28b90dd723d Mon Sep 17 00:00:00 2001
From: Eric Wong <e@80x24.org>
Date: Wed, 9 Jun 2021 20:27:50 -0300
Subject: [PATCH] lei tag: less confusing warning about unimported messages

"unimported" is more meaningful than "missing", here.  And
instead of having every worker spew about unimported messages,
we'll accumulate and only print one warning line.  This
necessitated alterating ->DESTROY behavior and persisting
the client socket within the $lei object itself, not just
the PktOp consumer object.
---
 lib/PublicInbox/LEI.pm    | 21 ++++++++++++++++-----
 lib/PublicInbox/LeiTag.pm | 12 ++++++------
 2 files changed, 22 insertions(+), 11 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index beeb8b48..d34997fd 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -567,6 +567,11 @@ sub pkt_op_pair {
 	$end;
 }
 
+sub incr {
+	my ($self, $field, $nr) = @_;
+	$self->{counters}->{$field} += $nr;
+}
+
 sub workers_start {
 	my ($lei, $wq, $jobs, $ops, $flds) = @_;
 	$ops = {
@@ -574,6 +579,7 @@ sub workers_start {
 		'|' => [ \&sigpipe_handler, $lei ],
 		'x_it' => [ \&x_it, $lei ],
 		'child_error' => [ \&child_error, $lei ],
+		'incr' => [ \&incr, $lei ],
 		($ops ? %$ops : ()),
 	};
 	$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
@@ -583,8 +589,6 @@ sub workers_start {
 	$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
 	delete $lei->{pkt_op_p};
 	my $op_c = delete $lei->{pkt_op_c};
-	# {-lei_sock} persists script/lei process until ops->{''} EOF callback
-	$op_c->{-lei_sock} = $lei->{sock};
 	@$end = ();
 	$lei->event_step_init;
 	($op_c, $ops);
@@ -1092,10 +1096,11 @@ sub event_step {
 
 sub event_step_init {
 	my ($self) = @_;
-	return if $self->{-event_init_done}++;
-	if (my $sock = $self->{sock}) { # using DS->EventLoop
+	my $sock = $self->{sock} or return;
+	$self->{-event_init_done} //= do { # persist til $ops done
 		$self->SUPER::new($sock, EPOLLIN|EPOLLET);
-	}
+		$sock;
+	};
 }
 
 sub noop {}
@@ -1246,6 +1251,12 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected
 # can immediately reread it
 sub DESTROY {
 	my ($self) = @_;
+	if (my $counters = delete $self->{counters}) {
+		for my $k (sort keys %$counters) {
+			my $nr = $counters->{$k};
+			$self->child_error(1 << 8, "$nr $k messages");
+		}
+	}
 	$self->{1}->autoflush(1) if $self->{1};
 	stop_pager($self);
 	# preserve $? for ->fail or ->x_it code
diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm
index e0532653..463fb921 100644
--- a/lib/PublicInbox/LeiTag.pm
+++ b/lib/PublicInbox/LeiTag.pm
@@ -15,7 +15,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
 		$self->{lei}->{sto}->ipc_do('update_xvmd', $xoids, $eml,
 						$self->{vmd_mod});
 	} else {
-		++$self->{missing};
+		++$self->{unimported};
 	}
 }
 
@@ -40,7 +40,7 @@ sub lei_tag { # the "lei tag" method
 	my ($lei, @argv) = @_;
 	my $sto = $lei->_lei_store(1);
 	$sto->write_prepare($lei);
-	my $self = bless { missing => 0 }, __PACKAGE__;
+	my $self = bless {}, __PACKAGE__;
 	$lei->ale; # refresh and prepare
 	my $vmd_mod = $self->vmd_mod_extract(\@argv);
 	return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
@@ -58,10 +58,10 @@ sub lei_tag { # the "lei tag" method
 	$lei->wait_wq_events($op_c, $ops);
 }
 
-sub note_missing {
+sub note_unimported {
 	my ($self) = @_;
-	my $n = $self->{missing} or return;
-	$self->{lei}->child_error(1 << 8, "$n missed messages");
+	my $n = $self->{unimported} or return;
+	$self->{lei}->{pkt_op_p}->pkt_do('incr', 'unimported', $n);
 }
 
 sub ipc_atfork_child {
@@ -69,7 +69,7 @@ sub ipc_atfork_child {
 	PublicInbox::LeiInput::input_only_atfork_child($self);
 	$self->{lse} = $self->{lei}->{sto}->search;
 	# this goes out-of-scope at worker process exit:
-	PublicInbox::OnDestroy->new($$, \&note_missing, $self);
+	PublicInbox::OnDestroy->new($$, \&note_unimported, $self);
 }
 
 # Workaround bash word-splitting s to ['kw', ':', 'keyword' ...]
-- 
2.50.0