]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
dir_idle: do not add watches in ->new
[public-inbox.git] / lib / PublicInbox / LEI.pm
index 183cb545fe55276053c0d9110eef0823d1d08d11..a526a91f80355a6d8430494688cf61ebc5bf4749 100644 (file)
@@ -148,7 +148,7 @@ sub index_opt {
 
 my @c_opt = qw(c=s@ C=s@ quiet|q);
 my @net_opt = (qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt());
-my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only=s@
+my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only|O=s@
        import-remote!);
 
 # we don't support -C as an alias for --find-copies since it's already
@@ -225,7 +225,7 @@ our %CMD = ( # sorted in order of importance/use:
 
 'ls-search' => [ '[PREFIX]', 'list saved search queries',
                qw(format|f=s pretty l ascii z|0), @c_opt ],
-'forget-search' => [ 'OUTPUT', 'forget a saved search',
+'forget-search' => [ 'OUTPUT...', 'forget a saved search',
                qw(verbose|v+), @c_opt ],
 'edit-search' => [ 'OUTPUT', "edit saved search via `git config --edit'",
                        @c_opt ],
@@ -284,7 +284,7 @@ our %CMD = ( # sorted in order of importance/use:
        }, qw(config-file|system|global|file|f=s), # for conflict detection
         qw(edit|e c=s@ C=s@), pass_through('git config') ],
 'inspect' => [ 'ITEMS...|--stdin', 'inspect lei/store and/or local external',
-       qw(stdin| pretty ascii dir=s), @c_opt ],
+       qw(stdin| pretty ascii dir|d=s), @c_opt ],
 
 'init' => [ '[DIRNAME]', sub {
        "initialize storage, default: ".store_path($_[0]);
@@ -337,7 +337,8 @@ my %OPTDESC = (
 'path-a|a=s' => 'pre-image pathname associated with OID',
 'path-b|b=s' => 'post-image pathname associated with OID',
 'git-dir=s@' => 'additional git repository to scan',
-'dir=s inspect' => 'specify a inboxdir, extindex topdir or Xapian shard',
+'dir|d=s       inspect' =>
+       'specify a inboxdir, extindex topdir or Xapian shard',
 'proxy=s' => [ 'PROTO://HOST[:PORT]', # shared with curl(1)
        "proxy for (e.g. `socks5h://0:9050')" ],
 'torsocks=s' => ['VAL|auto|no|yes',
@@ -395,7 +396,7 @@ my %OPTDESC = (
                'exclude specified external(s) from search' ],
 'include|I=s@  q' => [ 'LOCATION',
                'include specified external(s) in search' ],
-'only=s@       q' => [ 'LOCATION',
+'only|O=s@     q' => [ 'LOCATION',
                'only use specified external(s) for search' ],
 'jobs=s        q' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
                'control number of search and writer jobs' ],
@@ -454,16 +455,12 @@ my %CONFIG_KEYS = (
        'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers
 
 sub _drop_wq {
        my ($self) = @_;
        for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
-               if ($wq->wq_kill) {
-                       $wq->wq_close(0, undef, $self);
-               } elsif ($wq->wq_kill_old) {
-                       $wq->wq_wait_old(undef, $self);
-               }
+               $wq->wq_kill('-TERM');
                $wq->DESTROY;
        }
 }
@@ -471,6 +468,7 @@ sub _drop_wq {
 # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
 sub x_it ($$) {
        my ($self, $code) = @_;
+       local $current_lei = $self;
        # make sure client sees stdout before exit
        $self->{1}->autoflush(1) if $self->{1};
        stop_pager($self);
@@ -504,6 +502,7 @@ sub qfin { # show message on finalization (LeiFinmsg)
 
 sub fail_handler ($;$$) {
        my ($lei, $code, $io) = @_;
+       local $current_lei = $lei;
        close($io) if $io; # needed to avoid warnings on SIGPIPE
        _drop_wq($lei);
        x_it($lei, $code // (1 << 8));
@@ -513,14 +512,9 @@ sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers
        fail_handler($_[0], 13, delete $_[0]->{1});
 }
 
-# PublicInbox::OnDestroy callback for SIGINT to take out the entire pgid
-sub sigint_reap {
-       my ($pgid) = @_;
-       dwaitpid($pgid) if kill('-INT', $pgid);
-}
-
 sub fail ($$;$) {
        my ($self, $buf, $exit_code) = @_;
+       local $current_lei = $self;
        $self->{failed}++;
        warn($buf, "\n") if defined $buf;
        $self->{pkt_op_p}->pkt_do('fail_handler') if $self->{pkt_op_p};
@@ -541,6 +535,7 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) }
 
 sub child_error { # passes non-fatal curl exit codes to user
        my ($self, $child_error, $msg) = @_; # child_error is $?
+       local $current_lei = $self;
        $child_error ||= 1 << 8;
        warn($msg, "\n") if defined $msg;
        if ($self->{pkt_op_p}) { # to top lei-daemon
@@ -571,8 +566,10 @@ sub _lei_atfork_child {
        } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly
                open STDERR, '+>&='.fileno($self->{2}) or warn "open $!";
                STDERR->autoflush(1);
+               POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!";
        }
        close($_) for (grep(defined, delete @$self{qw(3 old_1 au_done)}));
+       delete $self->{-socks};
        if (my $op_c = delete $self->{pkt_op_c}) {
                close(delete $op_c->{sock});
        }
@@ -594,6 +591,7 @@ sub _lei_atfork_child {
                        $cb->(@_) unless PublicInbox::Eml::warn_ignore(@_)
                };
        }
+       $SIG{TERM} = sub { exit(128 + 15) };
        $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
@@ -642,6 +640,7 @@ sub workers_start {
        my $op_c = delete $lei->{pkt_op_c};
        @$end = ();
        $lei->event_step_init;
+       $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        ($op_c, $ops);
 }
 
@@ -649,7 +648,7 @@ sub workers_start {
 sub wait_wq_events {
        my ($lei, $op_c, $ops) = @_;
        for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
-               $wq->wq_close(1);
+               $wq->wq_close;
        }
        $op_c->{ops} = $ops;
 }
@@ -1019,7 +1018,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
                        $cmd = [ Text::ParseWords::shellwords($cmd) ];
                        send($sock, exec_buf($cmd, {}), MSG_EOR) if $sock;
                } else {
-                       err($self, "W: unsupported --alert=$op"); # non-fatal
+                       warn("W: unsupported --alert=$op\n"); # non-fatal
                }
        }
 }
@@ -1068,7 +1067,7 @@ sub start_pager {
 # display a message for user before spawning full-screen $VISUAL
 sub pgr_err {
        my ($self, @msg) = @_;
-       return $self->err(@msg) unless $self->{sock} && -t $self->{2};
+       return warn(@msg) unless $self->{sock} && -t $self->{2};
        start_pager($self, { LESS => 'RX' }); # no 'F' so we prompt
        print { $self->{2} } @msg;
        $self->{2}->autoflush(1);
@@ -1118,6 +1117,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 
 sub dclose {
        my ($self) = @_;
+       local $current_lei = $self;
        delete $self->{-progress};
        _drop_wq($self) if $self->{failed};
        $self->close if $self->{-event_init_done}; # PublicInbox::DS::close
@@ -1142,13 +1142,18 @@ sub event_step {
                if ($buf eq '') {
                        _drop_wq($self); # EOF, client disconnected
                        dclose($self);
-               } elsif ($buf =~ /\A(STOP|CONT)\z/) {
+                       $buf = 'TERM';
+               }
+               if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
+                       my $sig = "-$buf";
                        for my $wq (grep(defined, @$self{@WQ_KEYS})) {
-                               $wq->wq_kill($buf) or $wq->wq_kill_old($buf);
+                               $wq->wq_kill($sig);
                        }
                } else {
                        die "unrecognized client signal: $buf";
                }
+               my $s = $self->{-socks} // []; # lei up --all
+               @$s = grep { send($_, $buf, MSG_EOR) } @$s;
        };
        if (my $err = $@) {
                eval { $self->fail($err) };
@@ -1300,10 +1305,11 @@ sub lazy_start {
                USR2 => \&noop,
        };
        require PublicInbox::DirIdle;
-       local $dir_idle = PublicInbox::DirIdle->new([$sock_dir], sub {
+       local $dir_idle = PublicInbox::DirIdle->new(sub {
                # just rely on wakeup to hit PostLoopCallback set below
                dir_idle_handler($_[0]) if $_[0]->fullname ne $path;
-       }, 1);
+       });
+       $dir_idle->add_watches([$sock_dir]);
        PublicInbox::DS->SetPostLoopCallback(sub {
                my ($dmap, undef) = @_;
                if (@st = defined($path) ? stat($path) : ()) {
@@ -1369,6 +1375,7 @@ sub DESTROY {
 sub wq_done_wait { # dwaitpid callback
        my ($arg, $pid) = @_;
        my ($wq, $lei) = @$arg;
+       local $current_lei = $lei;
        my $err_type = $lei->{-err_type};
        $? and $lei->child_error($?,
                        $err_type ? "$err_type errors during $lei->{cmd}" : ());
@@ -1383,8 +1390,8 @@ sub fchdir {
 
 sub wq_eof { # EOF callback for main daemon
        my ($lei) = @_;
-       my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
-       $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
+       local $current_lei = $lei;
+       delete $lei->{wq1} // return $lei->fail; # already failed
 }
 
 sub watch_state_ok ($) {
@@ -1423,7 +1430,7 @@ sub refresh_watches {
                $seen{$url} = undef;
                my $state = $cfg->get_1("watch.$url.state");
                if (!watch_state_ok($state)) {
-                       $lei->err("watch.$url.state=$state not supported");
+                       warn("watch.$url.state=$state not supported\n");
                        next;
                }
                if ($url =~ /\Amaildir:(.+)/i) {
@@ -1492,6 +1499,7 @@ sub lms {
 
 sub sto_done_request {
        my ($lei, $sock) = @_;
+       local $current_lei = $lei;
        eval {
                if ($sock //= $lei->{sock}) { # issue, async wait
                        $lei->{sto}->wq_io_do('done', [ $sock ]);
@@ -1499,15 +1507,26 @@ sub sto_done_request {
                        my $wait = $lei->{sto}->wq_do('done');
                }
        };
-       $lei->err($@) if $@;
+       warn($@) if $@;
 }
 
 sub cfg_dump ($$) {
        my ($lei, $f) = @_;
        my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei->{2}) };
        return $ret if !$@;
-       $lei->err($@);
+       warn($@);
        undef;
 }
 
+sub request_umask {
+       my ($lei) = @_;
+       my $s = $lei->{sock} // return;
+       send($s, 'umask', MSG_EOR) // die "send: $!";
+       vec(my $rvec = '', fileno($s), 1) = 1;
+       select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
+       recv($s, my $v, 5, 0) // die "recv: $!";
+       (my $u, $lei->{client_umask}) = unpack('AV', $v);
+       $u eq 'u' or warn "E: recv $v has no umask";
+}
+
 1;