]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei q: move augment into lei2mail workers
authorEric Wong <e@80x24.org>
Sun, 21 Feb 2021 07:41:31 +0000 (07:41 +0000)
committerEric Wong <e@80x24.org>
Sun, 21 Feb 2021 08:59:31 +0000 (08:59 +0000)
This is a step which will allow us to parallelize augment
on Maildir and IMAP.

lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
t/lei-externals.t

index 0e0b0a43bb9ddad945d66d67467c4fc6a68a5863..e539891277ca4bdfd88ffe779af407c04c628aec 100644 (file)
@@ -14,6 +14,7 @@ use PublicInbox::LeiDedupe;
 use PublicInbox::OnDestroy;
 use PublicInbox::Git;
 use PublicInbox::GitAsyncCat;
+use PublicInbox::PktOp qw(pkt_do);
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
@@ -499,7 +500,7 @@ sub pre_augment { # fast (1 disk seek), runs in same process as post_augment
 
 sub do_augment { # slow, runs in wq worker
        my ($self, $lei) = @_;
-       # _do_augment_maildir, _do_augment_mbox
+       # _do_augment_maildir, _do_augment_mbox, or _do_augment_imap
        my $m = "_do_augment_$self->{base_type}";
        $self->$m($lei);
 }
@@ -516,6 +517,13 @@ sub ipc_atfork_child {
        my ($self) = @_;
        my $lei = delete $self->{lei};
        $lei->lei_atfork_child;
+       if ($self->{-wq_worker_nr} == 0) {
+               local $0 = 'do_augment';
+               eval { do_augment($self, $lei) };
+               $lei->fail($@) if $@;
+               pkt_do($lei->{pkt_op_p}, '.') == 1 or
+                                       die "do_post_augment trigger: $!";
+       }
        if (my $zpipe = delete $lei->{zpipe}) {
                $lei->{1} = $zpipe->[1];
                close $zpipe->[0];
index 10485220a359976ebe6f88fad2ab3ce91c69eff0..a319b75f1e67740e610b92328ee639c3a8463547 100644 (file)
@@ -99,21 +99,21 @@ sub _mset_more ($$) {
        $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
 }
 
-# $startq will EOF when query_prepare is done augmenting and allow
+# $startq will EOF when do_augment is done augmenting and allow
 # query_mset and query_thread_mset to proceed.
 sub wait_startq ($) {
        my ($lei) = @_;
        my $startq = delete $lei->{startq} or return;
        while (1) {
-               my $n = sysread($startq, my $query_prepare_done, 1);
+               my $n = sysread($startq, my $do_augment_done, 1);
                if (defined $n) {
                        return if $n == 0; # no MUA
-                       if ($query_prepare_done eq 'q') {
+                       if ($do_augment_done eq 'q') {
                                $lei->{opt}->{quiet} = 1;
                                delete $lei->{opt}->{verbose};
                                delete $lei->{-progress};
                        } else {
-                               $lei->fail("$$ WTF `$query_prepare_done'");
+                               $lei->fail("$$ WTF `$do_augment_done'");
                        }
                        return;
                }
@@ -386,15 +386,6 @@ sub ipc_atfork_child {
        $self->SUPER::ipc_atfork_child;
 }
 
-sub query_prepare { # called by wq_io_do
-       my ($self) = @_;
-       local $0 = "$0 query_prepare";
-       my $lei = $self->{lei};
-       eval { $lei->{l2m}->do_augment($lei) };
-       $lei->fail($@) if $@;
-       pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
-}
-
 sub do_query {
        my ($self, $lei) = @_;
        my $ops = {
@@ -433,7 +424,6 @@ sub do_query {
        delete $lei->{pkt_op_p};
        $l2m->wq_close(1) if $l2m;
        $lei->event_step_init; # wait for shutdowns
-       $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe
        start_query($self, $lei);
        $self->wq_close(1); # lei_xsearch workers stop when done
        if ($lei->{oneshot}) {
index edfbb2bf3eed1df1154151c28e8fc4bfd0fd488d..02b15232836cbd670012ae03b329ad8ceb673347 100644 (file)
@@ -186,7 +186,8 @@ SKIP: {
                my @s = grep(/^Subject:/, $cat->());
                is(scalar(@s), 1, "1 result in mbox$sfx");
                $lei->('q', '-a', '-o', "mboxcl2:$f", 's:see attachment');
-               is(grep(!/^#/, $lei_err), 0, 'no errors from augment');
+               is(grep(!/^#/, $lei_err), 0, 'no errors from augment') or
+                       diag $lei_err;
                @s = grep(/^Subject:/, my @wtf = $cat->());
                is(scalar(@s), 2, "2 results in mbox$sfx");