From: Eric Wong <e@80x24.org>
Date: Mon, 22 Feb 2021 11:22:56 +0000 (-0300)
Subject: lei q: reduce wasted IMAP connection for auth
X-Git-Tag: v1.7.0~1078
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=fa5650d399f51a596e5c126b3ce65347409d4fe9;p=public-inbox.git

lei q: reduce wasted IMAP connection for auth

We can rework the first lei2mail worker to authenticate, and
then share auth info with the rest of the lei2mail workers.  As
with "lei import", this uses PktOp and lei-daemon to share
updated credentials between the first an subsequent l2m workers.
---

diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index d329eadb..b4777114 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -20,13 +20,6 @@ sub net_merge {
 	}
 }
 
-sub do_auth { # called via wq_io_do
-	my ($self) = @_;
-	my ($lei, $net) = @$self{qw(lei net)};
-	$net->imap_common_init($lei);
-	net_merge($lei, $net); # tell lei-daemon updated auth info
-}
-
 sub do_auth_atfork { # used by IPC WQ workers
 	my ($self, $wq) = @_;
 	return if $wq->{-wq_worker_nr} != 0;
@@ -63,36 +56,6 @@ sub op_merge { # prepares PktOp->pair ops
 	$ops->{net_merge_done1} = [ \&net_merge_done1, $wq ];
 }
 
-sub do_finish_auth { # dwaitpid callback
-	my ($arg, $pid) = @_;
-	my ($self, $lei, $post_auth_cb, @args) = @$arg;
-	$? ? $lei->dclose : $post_auth_cb->(@args);
-}
-
-sub auth_eof {
-	my ($lei, $post_auth_cb, @args) = @_;
-	my $self = delete $lei->{auth} or return;
-	$self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args);
-}
-
-sub auth_start {
-	my ($self, $lei, $post_auth_cb, @args) = @_;
-	my $op = $lei->workers_start($self, 'auth', 1, {
-		'net_merge' => [ \&net_merge, $lei ],
-		'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
-	});
-	$self->wq_io_do('do_auth', []);
-	$self->wq_close(1);
-	while ($op && $op->{sock}) { $op->event_step }
-}
-
-sub ipc_atfork_child {
-	my ($self) = @_;
-	delete $self->{lei}->{auth}; # drop circular ref
-	$self->{lei}->lei_atfork_child;
-	$self->SUPER::ipc_atfork_child;
-}
-
 sub new {
 	my ($cls, $net) = @_; # net may be NetReader or descendant (NetWriter)
 	bless { net => $net }, $cls;
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 3a714502..b45de4e0 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -62,7 +62,7 @@ sub do_convert { # via wq_do
 	delete $self->{wcb}; # commit
 }
 
-sub convert_start { # LeiAuth->auth_start callback
+sub convert_start {
 	my ($lei) = @_;
 	my $self = $lei->{cnv};
 	my $op = $lei->workers_start($self, 'lei_convert', 1, {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 398f834f..64c9394c 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -13,14 +13,11 @@ sub prep_ext { # externals_each callback
 
 sub _start_query {
 	my ($self) = @_;
-	if (my $nwr = $self->{nwr}) {
+	if (my $net = $self->{net}) {
 		require PublicInbox::LeiAuth;
-		my $auth = $self->{auth} = PublicInbox::LeiAuth->new($nwr);
-		my $lxs = $self->{lxs};
-		$auth->auth_start($self, $lxs->can('do_query'), $lxs, $self);
-	} else {
-		$self->{lxs}->do_query($self);
+		$self->{auth} = PublicInbox::LeiAuth->new($net);
 	}
+	$self->{lxs}->do_query($self);
 }
 
 sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 6efd398a..df813064 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -345,8 +345,8 @@ sub _imap_write_cb ($$) {
 	my ($self, $lei) = @_;
 	my $dedupe = $lei->{dedupe};
 	$dedupe->prepare_dedupe if $dedupe;
-	my $imap_append = $lei->{nwr}->can('imap_append');
-	my $mic = $lei->{nwr}->mic_get($self->{uri});
+	my $imap_append = $lei->{net}->can('imap_append');
+	my $mic = $lei->{net}->mic_get($self->{uri});
 	my $folder = $self->{uri}->mailbox;
 	sub { # for git_to_mail
 		my ($bref, $smsg, $eml) = @_;
@@ -394,15 +394,15 @@ sub new {
 		$self->{base_type} = 'mbox';
 	} elsif ($fmt =~ /\Aimaps?\z/) { # TODO .onion support
 		require PublicInbox::NetWriter;
-		my $nwr = PublicInbox::NetWriter->new;
-		$nwr->add_url($dst);
-		$nwr->{quiet} = $lei->{opt}->{quiet};
-		my $err = $nwr->errors($dst);
+		my $net = PublicInbox::NetWriter->new;
+		$net->add_url($dst);
+		$net->{quiet} = $lei->{opt}->{quiet};
+		my $err = $net->errors($dst);
 		return $lei->fail($err) if $err;
 		require PublicInbox::URIimap; # TODO: URI cast early
 		$self->{uri} = PublicInbox::URIimap->new($dst);
 		$self->{uri}->mailbox or die "No mailbox: $dst";
-		$lei->{nwr} = $nwr;
+		$lei->{net} = $net;
 		$self->{base_type} = 'imap';
 	} else {
 		die "bad mail --format=$fmt\n";
@@ -447,15 +447,16 @@ sub _augment_imap { # PublicInbox::NetReader::imap_each cb
 
 sub _do_augment_imap {
 	my ($self, $lei) = @_;
-	my $nwr = $lei->{nwr};
+	my $net = $lei->{net};
 	if ($lei->{opt}->{augment}) {
 		my $dedupe = $lei->{dedupe};
 		if ($dedupe && $dedupe->prepare_dedupe) {
-			$nwr->imap_each($self->{uri}, \&_augment_imap, $lei);
+			$net->imap_each($self->{uri}, \&_augment_imap, $lei);
 			$dedupe->pause_dedupe;
 		}
-	} else { # clobber existing IMAP folder
-		$nwr->imap_delete_all($self->{uri});
+	} elsif (!$self->{-wq_worker_nr}) { # undef or 0
+		# clobber existing IMAP folder
+		$net->imap_delete_all($self->{uri});
 	}
 }
 
@@ -523,16 +524,18 @@ sub post_augment {
 	$m->($self, $lei, @args);
 }
 
-sub ipc_atfork_child {
+sub do_post_auth {
 	my ($self) = @_;
-	my $lei = delete $self->{lei};
-	$lei->lei_atfork_child;
+	my $lei = $self->{lei};
+	# lei_xsearch can start as soon as all l2m workers get here
+	pkt_do($lei->{pkt_op_p}, 'incr_start_query') or
+		die "incr_start_query: $!";
 	my $aug;
 	if (lock_free($self)) {
 		my $mod = $self->{-wq_nr_workers};
 		my $shard = $self->{-wq_worker_nr};
-		if (my $nwr = $lei->{nwr}) {
-			$nwr->{shard_info} = [ $mod, $shard ];
+		if (my $net = $lei->{net}) {
+			$net->{shard_info} = [ $mod, $shard ];
 		} else { # Maildir (MH?)
 			$self->{shard_info} = [ $mod, $shard ];
 		}
@@ -545,13 +548,20 @@ sub ipc_atfork_child {
 		eval { do_augment($self, $lei) };
 		$lei->fail($@) if $@;
 		pkt_do($lei->{pkt_op_p}, $aug) == 1 or
-					die "do_post_augment trigger: $!";
+				die "do_post_augment trigger: $!";
 	}
 	if (my $zpipe = delete $lei->{zpipe}) {
 		$lei->{1} = $zpipe->[1];
 		close $zpipe->[0];
 	}
 	$self->{wcb} = $self->write_cb($lei);
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	$lei->lei_atfork_child;
+	$lei->{auth}->do_auth_atfork($self) if $lei->{auth};
 	$SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
 	$self->SUPER::ipc_atfork_child;
 }
@@ -584,4 +594,13 @@ sub wq_atexit_child {
 	$SIG{__WARN__} = 'DEFAULT';
 }
 
+# called in top-level lei-daemon when LeiAuth is done
+sub net_merge_complete {
+	my ($self) = @_;
+	$self->wq_broadcast('do_post_auth');
+	$self->wq_close(1);
+}
+
+no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
+*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e982165f..6dcadf0a 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -348,7 +348,7 @@ sub do_post_augment {
 	close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
-sub incr_post_augment { # called whenever an l2m shard finishes
+sub incr_post_augment { # called whenever an l2m shard finishes augment
 	my ($lei) = @_;
 	my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
 	return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
@@ -366,8 +366,8 @@ sub concurrency {
 }
 
 sub start_query { # always runs in main (lei-daemon) process
-	my ($self, $lei) = @_;
-	if ($lei->{opt}->{threads}) {
+	my ($self) = @_;
+	if ($self->{threads}) {
 		for my $ibxish (locals($self)) {
 			$self->wq_io_do('query_thread_mset', [], $ibxish);
 		}
@@ -382,6 +382,13 @@ sub start_query { # always runs in main (lei-daemon) process
 	for my $uris (@$q) {
 		$self->wq_io_do('query_remote_mboxrd', [], $uris);
 	}
+	$self->wq_close(1); # lei_xsearch workers stop when done
+}
+
+sub incr_start_query { # called whenever an l2m shard starts do_post_auth
+	my ($self, $l2m) = @_;
+	return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
+	start_query($self);
 }
 
 sub ipc_atfork_child {
@@ -393,6 +400,7 @@ sub ipc_atfork_child {
 
 sub do_query {
 	my ($self, $lei) = @_;
+	my $l2m = $lei->{l2m};
 	my $ops = {
 		'|' => [ $lei->can('sigpipe_handler'), $lei ],
 		'!' => [ $lei->can('fail_handler'), $lei ],
@@ -402,12 +410,13 @@ sub do_query {
 		'mset_progress' => [ \&mset_progress, $lei ],
 		'x_it' => [ $lei->can('x_it'), $lei ],
 		'child_error' => [ $lei->can('child_error'), $lei ],
+		'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
 	};
+	$lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
 	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
 	$lei->{1}->autoflush(1);
 	$lei->start_pager if delete $lei->{need_pager};
 	$lei->{ovv}->ovv_begin($lei);
-	my $l2m = $lei->{l2m};
 	if ($l2m) {
 		$l2m->pre_augment($lei);
 		if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
@@ -428,10 +437,13 @@ sub do_query {
 				$lei->oldset, { lei => $lei });
 	my $op = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};
-	$l2m->wq_close(1) if $l2m;
+	$self->{threads} = $lei->{opt}->{threads};
+	if ($l2m) {
+		$l2m->net_merge_complete unless $lei->{auth};
+	} else {
+		start_query($self);
+	}
 	$lei->event_step_init; # wait for shutdowns
-	start_query($self, $lei);
-	$self->wq_close(1); # lei_xsearch workers stop when done
 	if ($lei->{oneshot}) {
 		while ($op->{sock}) { $op->event_step }
 	}