]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei q: improve remote mboxrd UX + MUA
authorEric Wong <e@80x24.org>
Mon, 8 Feb 2021 09:05:09 +0000 (23:05 -1000)
committerEric Wong <e@80x24.org>
Mon, 8 Feb 2021 22:07:40 +0000 (22:07 +0000)
For early MUA spawners using lock-free outputs, we we need to
on the startq pipe to silence progress reporting.  For
--augment users, we can start the MUA even earlier by
creating Maildirs in the pre-augment phase.

To improve progress reporting for non-MUA (or late-MUA)
spawners, we'll no longer blindly append "--compressed" to the
curl(1) command when POST-ing for the gzipped mboxrd.
Furthermore, we'll overload stringify ('""') in LeiCurl to
ensure the empty -d '' string shows up properly.

v2: fix startq waiting with --threads
    mset_progress is never shown with early MUA spawning,
    The plan is to still show progress when augmenting and
    deduping.  This fixes all local search cases.
    A leftover debug bit is dropped, too

lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiCurl.pm
lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm

index c8673e26832fd960601903aa532e8b5ac290cc46..9331233a87931fa6351a673b5c432f2a708bc5da 100644 (file)
@@ -109,7 +109,6 @@ sub ipc_worker_spawn {
                $w_res->autoflush(1);
                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
                local $0 = $ident;
-               PublicInbox::DS::sig_setmask($sigset);
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                eval {
@@ -117,6 +116,7 @@ sub ipc_worker_spawn {
                        local @$self{keys %$fields} = values(%$fields);
                        my $on_destroy = $self->ipc_atfork_child;
                        local %SIG = %SIG;
+                       PublicInbox::DS::sig_setmask($sigset);
                        ipc_worker_loop($self, $r_req, $w_res);
                };
                warn "worker $ident PID:$$ died: $@\n" if $@;
@@ -293,7 +293,6 @@ sub _wq_worker_start ($$$) {
                $SIG{$_} = 'IGNORE' for (qw(PIPE));
                $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
                local $0 = $self->{-wq_ident};
-               PublicInbox::DS::sig_setmask($oldset);
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                eval {
@@ -301,6 +300,7 @@ sub _wq_worker_start ($$$) {
                        local @$self{keys %$fields} = values(%$fields);
                        my $on_destroy = $self->ipc_atfork_child;
                        local %SIG = %SIG;
+                       PublicInbox::DS::sig_setmask($oldset);
                        wq_worker_loop($self);
                };
                warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
@@ -395,9 +395,9 @@ sub wq_close {
 }
 
 sub wq_kill_old {
-       my ($self) = @_;
+       my ($self, $sig) = @_;
        my $pids = $self->{"-wq_old_pids.$$"} or return;
-       kill 'TERM', @$pids;
+       kill($sig // 'TERM', @$pids);
 }
 
 sub wq_kill {
index dce8076282d1c62bcd6d182713231c0bc3852b3b..c36456989449e31739945c3ca5b8b073ca09a6f8 100644 (file)
@@ -741,7 +741,9 @@ sub start_mua {
        } elsif ($self->{oneshot}) {
                $self->{"mua.pid.$self.$$"} = spawn(\@cmd);
        }
-       delete $self->{-progress};
+       if ($self->{lxs} && $self->{au_done}) { # kick wait_startq
+               syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0));
+       }
 }
 
 # caller needs to "-t $self->{1}" to check if tty
index 38b17c7859565e6e0e9ded1b50d835a7e0150536..f346a1b43830661c707c5c23ab1d5c9c56cefd3a 100644 (file)
@@ -8,6 +8,12 @@ use v5.10.1;
 use PublicInbox::Spawn qw(which);
 use PublicInbox::Config;
 
+# Ensures empty strings are quoted, we don't need more
+# sophisticated quoting than for empty strings: curl -d ''
+use overload '""' => sub {
+       join(' ', map { $_ eq '' ?  "''" : $_ } @{$_[0]});
+};
+
 my %lei2curl = (
        'curl-config=s@' => 'config|K=s@',
 );
@@ -63,10 +69,9 @@ EOM
 
 # completes the result of cmd() for $uri
 sub for_uri {
-       my ($self, $lei, $uri) = @_;
+       my ($self, $lei, $uri, @opt) = @_;
        my $pfx = torsocks($self, $lei, $uri) or return; # error
-       [ @$pfx, @$self, substr($uri->path, -3) eq '.gz' ? () : '--compressed',
-               $uri->as_string ]
+       bless [ @$pfx, @$self, @opt, $uri->as_string ], ref($self);
 }
 
 1;
index 5ba69287fdd50c5039a10e56b8796de1320f91c3..c5153148362578f6027e76224bd0adaed564743d 100644 (file)
@@ -31,7 +31,7 @@ sub try_scrape {
        my $uri = URI->new($self->{src});
        my $lei = $self->{lei};
        my $curl = $self->{curl} //= PublicInbox::LeiCurl->new($lei) or return;
-       my $cmd = $curl->for_uri($lei, $uri);
+       my $cmd = $curl->for_uri($lei, $uri, '--compressed');
        my $opt = { 0 => $lei->{0}, 2 => $lei->{2} };
        my $fh = popen_rd($cmd, $lei->{env}, $opt);
        my $html = do { local $/; <$fh> } // die "read(curl $uri): $!";
@@ -93,8 +93,7 @@ sub _try_config {
        my $path = $uri->path;
        chop($path) eq '/' or die "BUG: $uri not canonicalized";
        $uri->path($path . '/_/text/config/raw');
-       my $cmd = $self->{curl}->for_uri($lei, $uri);
-       push @$cmd, '--compressed'; # curl decompresses for us
+       my $cmd = $self->{curl}->for_uri($lei, $uri, '--compressed');
        my $ce = "$dst/inbox.config.example";
        my $f = "$ce-$$.tmp";
        open(my $fh, '+>', $f) or return $lei->err("open $f: $! (non-fatal)");
index dcfb9cc7d54a7a7f974a3c2c4c5680ba1218541f..f0ac468417cc5ea4666bc3cbb589a310ef83957e 100644 (file)
@@ -95,9 +95,10 @@ sub new {
                $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
        } else {
                # default to the cheapest sort since MUA usually resorts
-               $lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+               $opt->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
                $lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) };
                return $lei->fail($@) if $@;
+               $lei->{early_mua} = 1 if $opt->{mua} && $lei->{l2m}->lock_free;
        }
        $self;
 }
index 4c5a5685a6a8b06a09a978fb015ae6c70c380a33..a5a196db63d50bd5fbb30f65348d32d5d0b6ccec 100644 (file)
@@ -371,7 +371,17 @@ sub new {
        $self;
 }
 
-sub _pre_augment_maildir {} # noop
+sub _pre_augment_maildir {
+       my ($self, $lei) = @_;
+       my $dst = $lei->{ovv}->{dst};
+       for my $x (qw(tmp new cur)) {
+               my $d = $dst.$x;
+               next if -d $d;
+               require File::Path;
+               File::Path::mkpath($d);
+               -d $d or die "$d is not a directory";
+       }
+}
 
 sub _do_augment_maildir {
        my ($self, $lei) = @_;
@@ -388,17 +398,7 @@ sub _do_augment_maildir {
        }
 }
 
-sub _post_augment_maildir {
-       my ($self, $lei) = @_;
-       my $dst = $lei->{ovv}->{dst};
-       for my $x (qw(tmp new cur)) {
-               my $d = $dst.$x;
-               next if -d $d;
-               require File::Path;
-               File::Path::mkpath($d);
-               -d $d or die "$d is not a directory";
-       }
-}
+sub _post_augment_maildir {} # noop
 
 sub _pre_augment_mbox {
        my ($self, $lei) = @_;
index 2794140a58998b6390f9f5b0a5a437716db7e282..db089a67ca274a0e53bb7ae67bbdb9b9cc2ca30d 100644 (file)
@@ -101,20 +101,34 @@ sub _mset_more ($$) {
 # $startq will EOF when query_prepare is done augmenting and allow
 # query_mset and query_thread_mset to proceed.
 sub wait_startq ($) {
-       my ($startq) = @_;
-       $_[0] = undef;
-       read($startq, my $query_prepare_done, 1);
+       my ($lei) = @_;
+       my $startq = delete $lei->{startq} or return;
+       while (1) {
+               my $n = sysread($startq, my $query_prepare_done, 1);
+               if (defined $n) {
+                       return if $n == 0; # no MUA
+                       if ($query_prepare_done eq 'q') {
+                               $lei->{opt}->{quiet} = 1;
+                               delete $lei->{opt}->{verbose};
+                               delete $lei->{-progress};
+                       } else {
+                               $lei->fail("$$ WTF `$query_prepare_done'");
+                       }
+                       return;
+               }
+               return $lei->fail("$$ wait_startq: $!") unless $!{EINTR};
+       }
 }
 
 sub mset_progress {
        my $lei = shift;
-       return unless $lei->{-progress};
+       return if $lei->{early_mua} || !$lei->{-progress};
        if ($lei->{pkt_op_p}) {
                pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
        } else { # single lei-daemon consumer
                my ($desc, $mset_size, $mset_total_est) = @_;
                $lei->{-mset_total} += $mset_size;
-               $lei->err("# $desc $mset_size/$mset_total_est");
+               $lei->qerr("# $desc $mset_size/$mset_total_est");
        }
 }
 
@@ -122,7 +136,6 @@ sub query_thread_mset { # for --threads
        my ($self, $ibxish) = @_;
        local $0 = "$0 query_thread_mset";
        my $lei = $self->{lei};
-       my $startq = delete $lei->{startq};
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
        my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
        return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
@@ -140,7 +153,7 @@ sub query_thread_mset { # for --threads
                while ($over->expand_thread($ctx)) {
                        for my $n (@{$ctx->{xids}}) {
                                my $smsg = $over->get_art($n) or next;
-                               wait_startq($startq) if $startq;
+                               wait_startq($lei);
                                my $mitem = delete $n2item{$smsg->{num}};
                                $each_smsg->($smsg, $mitem);
                        }
@@ -155,7 +168,6 @@ sub query_mset { # non-parallel for non-"--threads" users
        my ($self) = @_;
        local $0 = "$0 query_mset";
        my $lei = $self->{lei};
-       my $startq = delete $lei->{startq};
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
        for my $loc (locals($self)) {
@@ -168,7 +180,7 @@ sub query_mset { # non-parallel for non-"--threads" users
                                $mset->size, $mset->get_matches_estimated);
                for my $mitem ($mset->items) {
                        my $smsg = smsg_for($self, $mitem) or next;
-                       wait_startq($startq) if $startq;
+                       wait_startq($lei);
                        $each_smsg->($smsg, $mitem);
                }
        } while (_mset_more($mset, $mo));
@@ -183,7 +195,7 @@ sub each_eml { # callback for MboxReader->mboxrd
        $smsg->parse_references($eml, mids($eml));
        $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
        delete @$smsg{qw(From Subject -ds -ts)};
-       if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+       wait_startq($lei);
        if ($lei->{-progress}) {
                ++$lei->{-nr_remote_eml};
                my $now = now();
@@ -210,7 +222,6 @@ sub query_remote_mboxrd {
        my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
        fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
        my $rdr = { 2 => $cerr, pgid => 0 };
-       my $coff = 0;
        my $sigint_reap = $lei->can('sigint_reap');
        if ($verbose) {
                # spawn a process to force line-buffering, otherwise curl
@@ -228,13 +239,14 @@ sub query_remote_mboxrd {
                $lei->{-nr_remote_eml} = 0;
                $uri->query_form(@qform);
                my $cmd = $curl->for_uri($lei, $uri);
-               $lei->err("# @$cmd") if $verbose;
+               $lei->qerr("# $cmd");
                my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
                $reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
                $fh = IO::Uncompress::Gunzip->new($fh);
                PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
                                                $lei, $each_smsg);
-               my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!";
+               my $err = waitpid($pid, 0) == $pid ? undef
+                                               : "BUG: waitpid($cmd): $!";
                @$reap_curl = (); # cancel OnDestroy
                die $err if $err;
                if ($? == 0) {
@@ -242,16 +254,18 @@ sub query_remote_mboxrd {
                        mset_progress($lei, $lei->{-current_url}, $nr, $nr);
                        next;
                }
-               seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
-               my $e = do { local $/; <$cerr> } //
-                               die "read(curl stderr): $!\n";
-               $coff += length($e);
-               truncate($cerr, 0);
-               next if (($? >> 8) == 22 && $e =~ /\b404\b/);
-               $lei->child_error($?);
+               $err = '';
+               if (-s $cerr) {
+                       seek($cerr, 0, SEEK_SET) or
+                                       $lei->err("seek($cmd stderr): $!");
+                       $err = do { local $/; <$cerr> } //
+                                       "read($cmd stderr): $!";
+                       truncate($cerr, 0) or
+                                       $lei->err("truncate($cmd stderr): $!");
+               }
+               next if (($? >> 8) == 22 && $err =~ /\b404\b/);
                $uri->query_form(q => $lei->{mset_opt}->{qstr});
-               # --verbose already showed the error via tail(1)
-               $lei->err("E: $uri \$?=$?\n", $verbose ? () : $e);
+               $lei->child_error($?, "E: <$uri> $err");
        }
        undef $each_smsg;
        $lei->{ovv}->ovv_atexit_child($lei);
@@ -311,15 +325,23 @@ Error closing $lei->{ovv}->{dst}: $!
 
 sub do_post_augment {
        my ($lei) = @_;
-       eval { $lei->{l2m}->post_augment($lei) };
-       if (my $err = $@) {
-               if (my $lxs = delete $lei->{lxs}) {
-                       $lxs->wq_kill;
-                       $lxs->wq_close(0, undef, $lei);
+       my $l2m = $lei->{l2m};
+       my $err;
+       if ($l2m) {
+               eval { $l2m->post_augment($lei) };
+               $err = $@;
+               if ($err) {
+                       if (my $lxs = delete $lei->{lxs}) {
+                               $lxs->wq_kill;
+                               $lxs->wq_close(0, undef, $lei);
+                       }
+                       $lei->fail("$err");
                }
-               $lei->fail("$err");
        }
-       close(delete $lei->{au_done}); # triggers wait_startq
+       if (!$err && delete $lei->{early_mua}) { # non-augment case
+               $lei->start_mua;
+       }
+       close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
 my $MAX_PER_HOST = 4;
@@ -334,9 +356,6 @@ sub concurrency {
 
 sub start_query { # always runs in main (lei-daemon) process
        my ($self, $lei) = @_;
-       if (my $l2m = $lei->{l2m}) {
-               $lei->start_mua if $l2m->lock_free;
-       }
        if ($lei->{opt}->{threads}) {
                for my $ibxish (locals($self)) {
                        $self->wq_io_do('query_thread_mset', [], $ibxish);
@@ -387,6 +406,9 @@ sub do_query {
        my $l2m = $lei->{l2m};
        if ($l2m) {
                $l2m->pre_augment($lei);
+               if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
+                       $lei->start_mua;
+               }
                $l2m->wq_workers_start('lei2mail', $l2m->{jobs},
                                        $lei->oldset, { lei => $lei });
                pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
@@ -404,7 +426,7 @@ sub do_query {
        delete $lei->{pkt_op_p};
        $l2m->wq_close(1) if $l2m;
        $lei->event_step_init; # wait for shutdowns
-       $self->wq_io_do('query_prepare', []) if $l2m;
+       $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe
        start_query($self, $lei);
        $self->wq_close(1); # lei_xsearch workers stop when done
        if ($lei->{oneshot}) {