X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLEI.pm;h=7dfd33989186d479c9f1b796599e29ff52d49fc1;hb=14f65bca12e367eed79e7b737d1cb19929599cf6;hp=df0bfab6dfb75064f8917fece47aed7650eb189d;hpb=996de3c67d490b4052bd16179b229b30c2f890f8;p=public-inbox.git diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index df0bfab6..7dfd3398 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -18,8 +18,7 @@ use POSIX qw(strftime); use IO::Handle (); use Fcntl qw(SEEK_SET); use PublicInbox::Config; -use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET); -use PublicInbox::Sigfd; +use PublicInbox::Syscall qw(EPOLLIN EPOLLET); use PublicInbox::DS qw(now dwaitpid); use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::Lock; @@ -149,7 +148,7 @@ sub index_opt { my @c_opt = qw(c=s@ C=s@ quiet|q); my @net_opt = (qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt()); -my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only=s@ +my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only|O=s@ import-remote!); # we don't support -C as an alias for --find-copies since it's already @@ -204,6 +203,11 @@ our %CMD = ( # sorted in order of importance/use: qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1), @diff_opt, @lxs_opt, @net_opt, @c_opt ], +'mail-diff' => [ '--stdin|LOCATION...', 'diff the contents of emails', + 'stdin|', # /|\z/ must be first for lone dash + qw(verbose|v+ color:s no-color raw-header), + @diff_opt, @net_opt, @c_opt ], + 'add-external' => [ 'LOCATION', 'add/set priority of a publicinbox|extindex for extra matches', qw(boost=i mirror=s inbox-version=i epoch=s verbose|v+), @@ -221,7 +225,7 @@ our %CMD = ( # sorted in order of importance/use: 'ls-search' => [ '[PREFIX]', 'list saved search queries', qw(format|f=s pretty l ascii z|0), @c_opt ], -'forget-search' => [ 'OUTPUT', 'forget a saved search', +'forget-search' => [ 'OUTPUT...', 'forget a saved search', qw(verbose|v+), @c_opt ], 'edit-search' => [ 'OUTPUT', "edit saved search via `git config --edit'", @c_opt ], @@ -280,7 +284,7 @@ our %CMD = ( # sorted in order of importance/use: }, qw(config-file|system|global|file|f=s), # for conflict detection qw(edit|e c=s@ C=s@), pass_through('git config') ], 'inspect' => [ 'ITEMS...|--stdin', 'inspect lei/store and/or local external', - qw(stdin| pretty ascii dir=s), @c_opt ], + qw(stdin| pretty ascii dir|d=s), @c_opt ], 'init' => [ '[DIRNAME]', sub { "initialize storage, default: ".store_path($_[0]); @@ -333,7 +337,8 @@ my %OPTDESC = ( 'path-a|a=s' => 'pre-image pathname associated with OID', 'path-b|b=s' => 'post-image pathname associated with OID', 'git-dir=s@' => 'additional git repository to scan', -'dir=s inspect' => 'specify a inboxdir, extindex topdir or Xapian shard', +'dir|d=s inspect' => + 'specify a inboxdir, extindex topdir or Xapian shard', 'proxy=s' => [ 'PROTO://HOST[:PORT]', # shared with curl(1) "proxy for (e.g. `socks5h://0:9050')" ], 'torsocks=s' => ['VAL|auto|no|yes', @@ -391,7 +396,7 @@ my %OPTDESC = ( 'exclude specified external(s) from search' ], 'include|I=s@ q' => [ 'LOCATION', 'include specified external(s) in search' ], -'only=s@ q' => [ 'LOCATION', +'only|O=s@ q' => [ 'LOCATION', 'only use specified external(s) for search' ], 'jobs=s q' => [ '[SEARCH_JOBS][,WRITER_JOBS]', 'control number of search and writer jobs' ], @@ -450,16 +455,12 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers +my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers sub _drop_wq { my ($self) = @_; for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) { - if ($wq->wq_kill) { - $wq->wq_close(0, undef, $self); - } elsif ($wq->wq_kill_old) { - $wq->wq_wait_old(undef, $self); - } + $wq->wq_kill('-TERM'); $wq->DESTROY; } } @@ -467,6 +468,7 @@ sub _drop_wq { # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE sub x_it ($$) { my ($self, $code) = @_; + local $current_lei = $self; # make sure client sees stdout before exit $self->{1}->autoflush(1) if $self->{1}; stop_pager($self); @@ -500,6 +502,7 @@ sub qfin { # show message on finalization (LeiFinmsg) sub fail_handler ($;$$) { my ($lei, $code, $io) = @_; + local $current_lei = $lei; close($io) if $io; # needed to avoid warnings on SIGPIPE _drop_wq($lei); x_it($lei, $code // (1 << 8)); @@ -509,16 +512,11 @@ sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers fail_handler($_[0], 13, delete $_[0]->{1}); } -# PublicInbox::OnDestroy callback for SIGINT to take out the entire pgid -sub sigint_reap { - my ($pgid) = @_; - dwaitpid($pgid) if kill('-INT', $pgid); -} - sub fail ($$;$) { my ($self, $buf, $exit_code) = @_; + local $current_lei = $self; $self->{failed}++; - err($self, $buf) if defined $buf; + warn($buf, "\n") if defined $buf; $self->{pkt_op_p}->pkt_do('fail_handler') if $self->{pkt_op_p}; x_it($self, ($exit_code // 1) << 8); undef; @@ -537,8 +535,9 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) } sub child_error { # passes non-fatal curl exit codes to user my ($self, $child_error, $msg) = @_; # child_error is $? + local $current_lei = $self; $child_error ||= 1 << 8; - $self->err($msg) if $msg; + warn($msg, "\n") if defined $msg; if ($self->{pkt_op_p}) { # to top lei-daemon $self->{pkt_op_p}->pkt_do('child_error', $child_error); } elsif ($self->{sock}) { # to lei(1) client @@ -567,8 +566,10 @@ sub _lei_atfork_child { } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly open STDERR, '+>&='.fileno($self->{2}) or warn "open $!"; STDERR->autoflush(1); + POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!"; } close($_) for (grep(defined, delete @$self{qw(3 old_1 au_done)})); + delete $self->{-socks}; if (my $op_c = delete $self->{pkt_op_c}) { close(delete $op_c->{sock}); } @@ -584,8 +585,13 @@ sub _lei_atfork_child { eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush'; undef $errors_log; $quit = \&CORE::exit; - $self->{-eml_noisy} or # only "lei import" sets this atm - $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); + if (!$self->{-eml_noisy}) { # only "lei import" sets this atm + my $cb = $SIG{__WARN__} // \&CORE::warn; + $SIG{__WARN__} = sub { + $cb->(@_) unless PublicInbox::Eml::warn_ignore(@_) + }; + } + $SIG{TERM} = sub { exit(128 + 15) }; $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } @@ -634,6 +640,7 @@ sub workers_start { my $op_c = delete $lei->{pkt_op_c}; @$end = (); $lei->event_step_init; + $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); ($op_c, $ops); } @@ -641,7 +648,7 @@ sub workers_start { sub wait_wq_events { my ($lei, $op_c, $ops) = @_; for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs - $wq->wq_close(1); + $wq->wq_close; } $op_c->{ops} = $ops; } @@ -1011,7 +1018,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail $cmd = [ Text::ParseWords::shellwords($cmd) ]; send($sock, exec_buf($cmd, {}), MSG_EOR) if $sock; } else { - err($self, "W: unsupported --alert=$op"); # non-fatal + warn("W: unsupported --alert=$op\n"); # non-fatal } } } @@ -1060,7 +1067,7 @@ sub start_pager { # display a message for user before spawning full-screen $VISUAL sub pgr_err { my ($self, @msg) = @_; - return $self->err(@msg) unless $self->{sock} && -t $self->{2}; + return warn(@msg) unless $self->{sock} && -t $self->{2}; start_pager($self, { LESS => 'RX' }); # no 'F' so we prompt print { $self->{2} } @msg; $self->{2}->autoflush(1); @@ -1110,6 +1117,7 @@ sub accept_dispatch { # Listener {post_accept} callback sub dclose { my ($self) = @_; + local $current_lei = $self; delete $self->{-progress}; _drop_wq($self) if $self->{failed}; $self->close if $self->{-event_init_done}; # PublicInbox::DS::close @@ -1134,13 +1142,18 @@ sub event_step { if ($buf eq '') { _drop_wq($self); # EOF, client disconnected dclose($self); - } elsif ($buf =~ /\A(STOP|CONT)\z/) { + $buf = 'TERM'; + } + if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) { + my $sig = "-$buf"; for my $wq (grep(defined, @$self{@WQ_KEYS})) { - $wq->wq_kill($buf) or $wq->wq_kill_old($buf); + $wq->wq_kill($sig); } } else { die "unrecognized client signal: $buf"; } + my $s = $self->{-socks} // []; # lei up --all + @$s = grep { send($_, $buf, MSG_EOR) } @$s; }; if (my $err = $@) { eval { $self->fail($err) }; @@ -1291,23 +1304,11 @@ sub lazy_start { USR1 => \&noop, USR2 => \&noop, }; - my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK); - local @SIG{keys %$sig} = values(%$sig) unless $sigfd; - undef $sig; - local $SIG{PIPE} = 'IGNORE'; require PublicInbox::DirIdle; local $dir_idle = PublicInbox::DirIdle->new([$sock_dir], sub { # just rely on wakeup to hit PostLoopCallback set below dir_idle_handler($_[0]) if $_[0]->fullname ne $path; }, 1); - if ($sigfd) { - undef $sigfd; # unref, already in DS::DescriptorMap - } else { - # wake up every second to accept signals if we don't - # have signalfd or IO::KQueue: - PublicInbox::DS::sig_setmask($oldset); - PublicInbox::DS->SetLoopTimeout(1000); - } PublicInbox::DS->SetPostLoopCallback(sub { my ($dmap, undef) = @_; if (@st = defined($path) ? stat($path) : ()) { @@ -1344,7 +1345,7 @@ sub lazy_start { open STDERR, '>&STDIN' or die "redirect stderr failed: $!"; open STDOUT, '>&STDIN' or die "redirect stdout failed: $!"; # $daemon pipe to `lei' closed, main loop begins: - eval { PublicInbox::DS->EventLoop }; + eval { PublicInbox::DS::event_loop($sig, $oldset) }; warn "event loop error: $@\n" if $@; # exit() may trigger waitpid via various DESTROY, ensure interruptible PublicInbox::DS::sig_setmask($oldset); @@ -1373,6 +1374,7 @@ sub DESTROY { sub wq_done_wait { # dwaitpid callback my ($arg, $pid) = @_; my ($wq, $lei) = @$arg; + local $current_lei = $lei; my $err_type = $lei->{-err_type}; $? and $lei->child_error($?, $err_type ? "$err_type errors during $lei->{cmd}" : ()); @@ -1387,8 +1389,8 @@ sub fchdir { sub wq_eof { # EOF callback for main daemon my ($lei) = @_; - my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed - $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei); + local $current_lei = $lei; + delete $lei->{wq1} // return $lei->fail; # already failed } sub watch_state_ok ($) { @@ -1427,7 +1429,7 @@ sub refresh_watches { $seen{$url} = undef; my $state = $cfg->get_1("watch.$url.state"); if (!watch_state_ok($state)) { - $lei->err("watch.$url.state=$state not supported"); + warn("watch.$url.state=$state not supported\n"); next; } if ($url =~ /\Amaildir:(.+)/i) { @@ -1496,6 +1498,7 @@ sub lms { sub sto_done_request { my ($lei, $sock) = @_; + local $current_lei = $lei; eval { if ($sock //= $lei->{sock}) { # issue, async wait $lei->{sto}->wq_io_do('done', [ $sock ]); @@ -1503,15 +1506,26 @@ sub sto_done_request { my $wait = $lei->{sto}->wq_do('done'); } }; - $lei->err($@) if $@; + warn($@) if $@; } sub cfg_dump ($$) { my ($lei, $f) = @_; my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei->{2}) }; return $ret if !$@; - $lei->err($@); + warn($@); undef; } +sub request_umask { + my ($lei) = @_; + my $s = $lei->{sock} // return; + send($s, 'umask', MSG_EOR) // die "send: $!"; + vec(my $rvec = '', fileno($s), 1) = 1; + select($rvec, undef, undef, 2) or die 'timeout waiting for umask'; + recv($s, my $v, 5, 0) // die "recv: $!"; + (my $u, $lei->{client_umask}) = unpack('AV', $v); + $u eq 'u' or warn "E: recv $v has no umask"; +} + 1;