]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei q: fix augment of compressed mailboxes
authorEric Wong <e@80x24.org>
Tue, 19 Jan 2021 09:34:31 +0000 (09:34 +0000)
committerEric Wong <e@80x24.org>
Thu, 21 Jan 2021 03:29:10 +0000 (03:29 +0000)
We need to delay writing out the mailbox until the compressor
process is up and running, so have startq wait a bit.  This
means we must create the pipe early and hand it off to the
workers before augmenting, despite spawning the
gzip/pigz/xz/bzip2 process after augment is complete.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
t/lei.t
t/lei_to_mail.t

index e4f8bedb20abd626b7a0f50cb67a8aefa69cff3a..f3edfe82fc3308b6844ed567cd253e37f6b91f68 100644 (file)
@@ -758,6 +758,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 sub dclose {
        my ($self) = @_;
        delete $self->{lxs}; # stops LeiXSearch queries
+       close(delete $self->{1}) if $self->{1}; # may reap_compress
        $self->close if $self->{sock}; # PublicInbox::DS::close
 }
 
index 99388b5b2efef4b9a08c91a515562b4bea76d51b..a6e517ea50ee3da25b6aa30c7c20a15a37bf6a02 100644 (file)
@@ -200,18 +200,19 @@ sub zsfx2cmd ($$$) {
 }
 
 sub _post_augment_mbox { # open a compressor process
-       my ($self, $lei) = @_;
+       my ($self, $lei, $zpipe) = @_;
        my $zsfx = $self->{zsfx} or return;
        my $cmd = zsfx2cmd($zsfx, undef, $lei);
-       pipe(my ($r, $w)) or die "pipe: $!";
+       my ($r, $w) = splice(@$zpipe, 0, 2);
        my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
        my $pid = spawn($cmd, $lei->{env}, $rdr);
-       $lei->{"pid.$pid"} = $cmd;
        my $pp = gensym;
-       tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $lei;
+       my $dup = bless { "pid.$pid" => $cmd }, ref($lei);
+       $dup->{$_} = $lei->{$_} for qw(2 sock);
+       tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $dup;
        $lei->{1} = $pp;
        die 'BUG: unexpected {ovv}->{lock_path}' if $lei->{ovv}->{lock_path};
-       $lei->{ovv}->ovv_out_lk_init if ($lei->{opt}->{jobs} // 2) > 1;
+       $lei->{ovv}->ovv_out_lk_init;
 }
 
 sub decompress_src ($$$) {
@@ -395,7 +396,9 @@ sub _pre_augment_mbox {
                die "seek($dst): $!\n";
        }
        state $zsfx_allow = join('|', keys %zsfx2cmd);
-       ($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/);
+       ($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/) or return;
+       pipe(my ($r, $w)) or die "pipe: $!";
+       [ $r, $w ];
 }
 
 sub _do_augment_mbox {
@@ -433,10 +436,10 @@ sub do_augment { # slow, runs in wq worker
 }
 
 sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
-       my ($self, $lei) = @_;
+       my ($self, $lei, @args) = @_;
        # _post_augment_maildir, _post_augment_mbox
        my $m = "_post_augment_$self->{base_type}";
-       $self->$m($lei);
+       $self->$m($lei, @args);
 }
 
 sub write_mail { # via ->wq_do
index 120857b80cfd240a951af01c74de189530558755..002791c21b73075448c3fed56a428d1126ebd0f4 100644 (file)
@@ -191,17 +191,22 @@ sub query_done { # EOF callback
                dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids;
        }
        $lei->{ovv}->ovv_end($lei);
-       $lei->start_mua if $l2m;
+       if ($l2m) { # calls LeiToMail reap_compress
+               close(delete($lei->{1})) if $lei->{1};
+               $lei->start_mua;
+       }
        $lei->dclose;
 }
 
+sub do_post_augment {
+       my ($lei, $zpipe, $au_done) = @_;
+       my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
+       $l2m->post_augment($lei, $zpipe);
+       close $au_done; # triggers wait_startq
+}
+
 sub start_query { # always runs in main (lei-daemon) process
        my ($self, $io, $lei, $srcs) = @_;
-       if (my $l2m = $lei->{l2m}) {
-               $lei->{1} = $io->[1];
-               $l2m->post_augment($lei);
-               $io->[1] = delete $lei->{1};
-       }
        my $remotes = $self->{remotes} // [];
        if ($lei->{opt}->{thread}) {
                for my $ibxish (@$srcs) {
@@ -221,9 +226,11 @@ sub start_query { # always runs in main (lei-daemon) process
 sub query_prepare { # called by wq_do
        my ($self, $lei) = @_;
        my %sig = $lei->atfork_child_wq($self);
+       -p $lei->{0} or die "BUG: \$done pipe expected";
        local @SIG{keys %sig} = values %sig;
        eval { $lei->{l2m}->do_augment($lei) };
        $lei->fail($@) if $@;
+       syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
 }
 
 sub sigpipe_handler {
@@ -253,26 +260,31 @@ sub do_query {
        $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
        my $l2m = $lei->{l2m};
        if ($l2m) {
-               $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
+               # may redirect $lei->{1} for mbox
+               my $zpipe = $l2m->pre_augment($lei_orig);
                $io[1] = $lei_orig->{1};
-               my @l2m_io = (undef, @io[1..$#io]);
-               pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
-               $self->wq_do('query_prepare', \@l2m_io, $lei);
+               pipe(my ($startq, $au_done)) or die "pipe: $!";
+               $done_op->{'.'} = [ \&do_post_augment, $lei_orig,
+                                       $zpipe, $au_done ];
                $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
+               $self->wq_do('query_prepare', \@io, $lei);
                die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
                fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
                $io[5] = $startq;
+               $io[1] = $zpipe->[1] if $zpipe;
        }
        start_query($self, \@io, $lei, $srcs);
        unless ($in_loop) {
                my @pids = $self->wq_close;
                # for the $lei->atfork_child_wq PIPE handler:
                $done_op->{'!'}->[3] = \@pids;
-               $done->event_step;
+               # $done->event_step;
+               # my $ipc_worker_reap = $self->can('ipc_worker_reap');
+               # if (my $l2m_pids = delete $self->{l2m_pids}) {
+                       # dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
+               # }
+               while ($done->{sock}) { $done->event_step }
                my $ipc_worker_reap = $self->can('ipc_worker_reap');
-               if (my $l2m_pids = delete $self->{l2m_pids}) {
-                       dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
-               }
                dwaitpid($_, $ipc_worker_reap, $self) for @pids;
        }
 }
diff --git a/t/lei.t b/t/lei.t
index c4692217667f3ed4ea037abfe72fca6c2074e7f1..8eede13e2437fb63d99e9bbe7e78d7b18c55275e 100644 (file)
--- a/t/lei.t
+++ b/t/lei.t
@@ -189,25 +189,35 @@ my $test_external = sub {
        # No double-quoting should be imposed on users on the CLI
        $lei->('q', 's:use boolean prefix');
        like($out, qr/search: use boolean prefix/, 'phrase search got result');
+       require IO::Uncompress::Gunzip;
+       for my $sfx ('', '.gz') {
+               my $f = "$home/mbox$sfx";
+               $lei->('q', '-o', "mboxcl2:$f", 's:use boolean prefix');
+               my $cat = $sfx eq '' ? sub {
+                       open my $mb, '<', $f or fail "no mbox: $!";
+                       <$mb>
+               } : sub {
+                       my $z = IO::Uncompress::Gunzip->new($f, MultiStream=>1);
+                       <$z>;
+               };
+               my @s = grep(/^Subject:/, $cat->());
+               is(scalar(@s), 1, "1 result in mbox$sfx");
+               $lei->('q', '-a', '-o', "mboxcl2:$f", 's:see attachment');
+               is($err, '', 'no errors from augment');
+               @s = grep(/^Subject:/, my @wtf = $cat->());
+               is(scalar(@s), 2, "2 results in mbox$sfx");
 
-       $lei->('q', '-o', "mboxcl2:$home/mbox", 's:use boolean prefix');
-       open my $mb, '<', "$home/mbox" or fail "no mbox: $!";
-       my @s = grep(/^Subject:/, <$mb>);
-       is(scalar(@s), 1, '1 result in mbox');
-       $lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:see attachment');
-       is($err, '', 'no errors from augment');
-       seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!";
-       @s = grep(/^Subject:/, <$mb>);
-       is(scalar(@s), 2, '2 results in mbox');
+               $lei->('q', '-a', '-o', "mboxcl2:$f", 's:nonexistent');
+               is($err, '', "no errors on no results ($sfx)");
 
-       $lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:nonexistent');
-       is($err, '', 'no errors on no results');
-       seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!";
-       my @s2 = grep(/^Subject:/, <$mb>);
-       is_deeply(\@s2, \@s, 'same 2 old results w/ --augment and bad search');
+               my @s2 = grep(/^Subject:/, $cat->());
+               is_deeply(\@s2, \@s,
+                       "same 2 old results w/ --augment and bad search $sfx");
 
-       $lei->('q', '-o', "mboxcl2:$home/mbox", 's:nonexistent');
-       is(-s "$home/mbox", 0, 'clobber w/o --augment');
+               $lei->('q', '-o', "mboxcl2:$f", 's:nonexistent');
+               my @res = $cat->();
+               is_deeply(\@res, [], "clobber w/o --augment $sfx");
+       }
 };
 
 my $test_lei_common = sub {
index e5ac8eac3585c0881ce0b67287d218d05a93eeef..6673d9a6aefe06bfda629199faf98ade833f6adc 100644 (file)
@@ -94,9 +94,9 @@ my $wcb_get = sub {
                my $dup = Storable::thaw(Storable::freeze($l2m));
                is_deeply($dup, $l2m, "$fmt round-trips through storable");
        }
-       $l2m->pre_augment($lei);
+       my $zpipe = $l2m->pre_augment($lei);
        $l2m->do_augment($lei);
-       $l2m->post_augment($lei);
+       $l2m->post_augment($lei, $zpipe);
        my $cb = $l2m->write_cb($lei);
        delete $lei->{1};
        $cb;