X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=blobdiff_plain;f=lib%2FPublicInbox%2FLEI.pm;h=9ab91714d0b8b725fdda3b92889b70574dce6b83;hp=148a5b1e530d00e0ab73d02aa0f214532d28ac0b;hb=6b52fe53a1f9e69a6ecfce16216a1fa180e4645f;hpb=9d0f17840479508de4aaf76fe6c150e94a9f79c3 diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 148a5b1e..9ab91714 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # Backend for `lei' (local email interface). Unlike the C10K-oriented @@ -12,15 +12,14 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal PublicInbox::LeiQuery); use Getopt::Long (); use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); -use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET); +use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET); use Cwd qw(getcwd); use POSIX qw(strftime); use IO::Handle (); use Fcntl qw(SEEK_SET); use PublicInbox::Config; -use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET); -use PublicInbox::Sigfd; -use PublicInbox::DS qw(now dwaitpid); +use PublicInbox::Syscall qw(EPOLLIN); +use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::Lock; use PublicInbox::Eml; @@ -78,19 +77,16 @@ sub rel2abs { return $p; } my $pwd = $self->{env}->{PWD}; - my $cwd; if (defined $pwd) { - my $xcwd = $self->{3} // - ($cwd = getcwd() // die "getcwd(PWD=$pwd): $!"); if (my @st_pwd = stat($pwd)) { - my @st_cwd = stat($xcwd) or die "stat($xcwd): $!"; + my @st_cwd = stat($self->{3}) or die "stat({3}): $!"; "@st_pwd[1,0]" eq "@st_cwd[1,0]" or $self->{env}->{PWD} = $pwd = undef; } else { # PWD was invalid $self->{env}->{PWD} = $pwd = undef; } } - $pwd //= $self->{env}->{PWD} = $cwd // getcwd() // die "getcwd: $!"; + $pwd //= $self->{env}->{PWD} = getcwd() // die "getcwd: $!"; File::Spec->rel2abs($p, $pwd); } @@ -134,9 +130,10 @@ sub url_folder_cache { sub ale { my ($self) = @_; - $self->{ale} //= do { + $self->{ale} // do { require PublicInbox::LeiALE; - $self->_lei_cfg(1)->{ale} //= PublicInbox::LeiALE->new($self); + my $cfg = $self->_lei_cfg(1); + $self->{ale} = $cfg->{ale} //= PublicInbox::LeiALE->new($self); }; } @@ -149,7 +146,7 @@ sub index_opt { my @c_opt = qw(c=s@ C=s@ quiet|q); my @net_opt = (qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt()); -my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only=s@ +my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only|O=s@ import-remote!); # we don't support -C as an alias for --find-copies since it's already @@ -182,8 +179,8 @@ our %CMD = ( # sorted in order of importance/use: shared color! mail-sync!), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], 'up' => [ 'OUTPUT...|--all', 'update saved search', - qw(jobs|j=s lock=s@ alert=s@ mua=s verbose|v+ - remote-fudge-time=s all:s), @c_opt ], + qw(jobs|j=s lock=s@ alert=s@ mua=s verbose|v+ exclude=s@ + remote-fudge-time=s all:s remote! local! external!), @net_opt, @c_opt ], 'lcat' => [ '--stdin|MSGID_OR_URL...', 'display local copy of message(s)', 'stdin|', # /|\z/ must be first for lone dash @@ -201,12 +198,17 @@ our %CMD = ( # sorted in order of importance/use: 'rediff' => [ '--stdin|LOCATION...', 'regenerate a diff with different options', 'stdin|', # /|\z/ must be first for lone dash - qw(git-dir=s@ cwd! verbose|v+ color:s no-color), + qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1), @diff_opt, @lxs_opt, @net_opt, @c_opt ], +'mail-diff' => [ '--stdin|LOCATION...', 'diff the contents of emails', + 'stdin|', # /|\z/ must be first for lone dash + qw(verbose|v+ in-format|F=s color:s no-color raw-header), + @diff_opt, @net_opt, @c_opt ], + 'add-external' => [ 'LOCATION', 'add/set priority of a publicinbox|extindex for extra matches', - qw(boost=i mirror=s inbox-version=i verbose|v+), + qw(boost=i mirror=s inbox-version=i epoch=s verbose|v+), @c_opt, index_opt(), @net_opt ], 'ls-external' => [ '[FILTER]', 'list publicinbox|extindex locations', qw(format|f=s z|0 globoff|g invert-match|v local remote), @c_opt ], @@ -214,15 +216,15 @@ our %CMD = ( # sorted in order of importance/use: 'ls-mail-sync' => [ '[FILTER]', 'list mail sync folders', qw(z|0 globoff|g invert-match|v local remote), @c_opt ], 'ls-mail-source' => [ 'URL', 'list IMAP or NNTP mail source folders', - qw(z|0 ascii l pretty url), @c_opt ], + qw(z|0 ascii l pretty url), @net_opt, @c_opt ], 'forget-external' => [ 'LOCATION...|--prune', 'exclude further results from a publicinbox|extindex', qw(prune), @c_opt ], 'ls-search' => [ '[PREFIX]', 'list saved search queries', qw(format|f=s pretty l ascii z|0), @c_opt ], -'forget-search' => [ 'OUTPUT', 'forget a saved search', - qw(verbose|v+), @c_opt ], +'forget-search' => [ 'OUTPUT...|--prune', 'forget a saved search', + qw(verbose|v+ prune:s), @c_opt ], 'edit-search' => [ 'OUTPUT', "edit saved search via `git config --edit'", @c_opt ], 'rm' => [ '--stdin|LOCATION...', @@ -264,7 +266,8 @@ our %CMD = ( # sorted in order of importance/use: 'forget-mail-sync' => [ 'LOCATION...', 'forget sync information for a mail folder', @c_opt ], 'refresh-mail-sync' => [ 'LOCATION...|--all', - 'prune dangling sync data for a mail folder', 'all:s', @c_opt ], + 'prune dangling sync data for a mail folder', 'all:s', + @net_opt, @c_opt ], 'export-kw' => [ 'LOCATION...|--all', 'one-time export of keywords of sync sources', qw(all:s mode=s), @net_opt, @c_opt ], @@ -272,15 +275,15 @@ our %CMD = ( # sorted in order of importance/use: 'one-time conversion from URL or filesystem to another format', qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw!), @net_opt, @c_opt ], -'p2q' => [ 'FILE|COMMIT_OID|--stdin', +'p2q' => [ 'LOCATION_OR_COMMIT...|--stdin', "use a patch to generate a query for `lei q --stdin'", - qw(stdin| want|w=s@ uri debug), @c_opt ], + qw(stdin| in-format|F=s want|w=s@ uri debug), @net_opt, @c_opt ], 'config' => [ '[...]', sub { 'git-config(1) wrapper for '._config_path($_[0]); }, qw(config-file|system|global|file|f=s), # for conflict detection qw(edit|e c=s@ C=s@), pass_through('git config') ], 'inspect' => [ 'ITEMS...|--stdin', 'inspect lei/store and/or local external', - qw(stdin| pretty ascii dir=s), @c_opt ], + qw(stdin| pretty ascii dir|d=s), @c_opt ], 'init' => [ '[DIRNAME]', sub { "initialize storage, default: ".store_path($_[0]); @@ -333,7 +336,8 @@ my %OPTDESC = ( 'path-a|a=s' => 'pre-image pathname associated with OID', 'path-b|b=s' => 'post-image pathname associated with OID', 'git-dir=s@' => 'additional git repository to scan', -'dir=s inspect' => 'specify a inboxdir, extindex topdir or Xapian shard', +'dir|d=s inspect' => + 'specify a inboxdir, extindex topdir or Xapian shard', 'proxy=s' => [ 'PROTO://HOST[:PORT]', # shared with curl(1) "proxy for (e.g. `socks5h://0:9050')" ], 'torsocks=s' => ['VAL|auto|no|yes', @@ -351,6 +355,7 @@ my %OPTDESC = ( 'want|w=s@' => [ 'PREFIX|dfpost|dfn', # common ones in help... 'search prefixes to extract (default: dfpost7)' ], +'uri p2q' => [ 'URI escape output' ], 'alert=s@' => ['CMD,:WINCH,:bell,', 'run command(s) or perform ops when done writing to output ' . @@ -391,7 +396,7 @@ my %OPTDESC = ( 'exclude specified external(s) from search' ], 'include|I=s@ q' => [ 'LOCATION', 'include specified external(s) in search' ], -'only=s@ q' => [ 'LOCATION', +'only|O=s@ q' => [ 'LOCATION', 'only use specified external(s) for search' ], 'jobs=s q' => [ '[SEARCH_JOBS][,WRITER_JOBS]', 'control number of search and writer jobs' ], @@ -406,6 +411,9 @@ my %OPTDESC = ( 'url ls-mail-source' => 'show full URL of newsgroup or IMAP folder', 'format|f=s ls-external' => $ls_format, +'prune:s forget-search' => + ['TYPE|local|remote', 'prune all, remote or local folders' ], + 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ], 'offset=i' => ['OFF', 'search result offset (default: 0)'], @@ -420,7 +428,9 @@ my %OPTDESC = ( 'remote' => 'limit operations to those requiring network access', 'remote!' => 'prevent operations requiring network access', -'all:s up' => ['local|remote', 'update all remote or local saved searches' ], +# up, refresh-mail-sync, export-kw +'all:s' => ['TYPE|local|remote', 'all remote or local folders' ], + 'remote-fudge-time=s' => [ 'INTERVAL', 'look for mail INTERVAL older than the last successful query' ], @@ -448,16 +458,12 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers +my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers sub _drop_wq { my ($self) = @_; for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) { - if ($wq->wq_kill) { - $wq->wq_close(0, undef, $self); - } elsif ($wq->wq_kill_old) { - $wq->wq_wait_old(undef, $self); - } + $wq->wq_kill('-TERM'); $wq->DESTROY; } } @@ -465,6 +471,7 @@ sub _drop_wq { # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE sub x_it ($$) { my ($self, $code) = @_; + local $current_lei = $self; # make sure client sees stdout before exit $self->{1}->autoflush(1) if $self->{1}; stop_pager($self); @@ -498,6 +505,7 @@ sub qfin { # show message on finalization (LeiFinmsg) sub fail_handler ($;$$) { my ($lei, $code, $io) = @_; + local $current_lei = $lei; close($io) if $io; # needed to avoid warnings on SIGPIPE _drop_wq($lei); x_it($lei, $code // (1 << 8)); @@ -507,18 +515,12 @@ sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers fail_handler($_[0], 13, delete $_[0]->{1}); } -# PublicInbox::OnDestroy callback for SIGINT to take out the entire pgid -sub sigint_reap { - my ($pgid) = @_; - dwaitpid($pgid) if kill('-INT', $pgid); -} - sub fail ($$;$) { - my ($self, $buf, $exit_code) = @_; + my ($self, $msg, $exit_code) = @_; + local $current_lei = $self; $self->{failed}++; - err($self, $buf) if defined $buf; - # calls fail_handler - $self->{pkt_op_p}->pkt_do('!') if $self->{pkt_op_p}; + warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg; + $self->{pkt_op_p}->pkt_do('fail_handler') if $self->{pkt_op_p}; x_it($self, ($exit_code // 1) << 8); undef; } @@ -536,8 +538,9 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) } sub child_error { # passes non-fatal curl exit codes to user my ($self, $child_error, $msg) = @_; # child_error is $? + local $current_lei = $self; $child_error ||= 1 << 8; - $self->err($msg) if $msg; + warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg; if ($self->{pkt_op_p}) { # to top lei-daemon $self->{pkt_op_p}->pkt_do('child_error', $child_error); } elsif ($self->{sock}) { # to lei(1) client @@ -550,7 +553,7 @@ sub child_error { # passes non-fatal curl exit codes to user sub note_sigpipe { # triggers sigpipe_handler my ($self, $fd) = @_; close(delete($self->{$fd})); # explicit close silences Perl warning - $self->{pkt_op_p}->pkt_do('|') if $self->{pkt_op_p}; + $self->{pkt_op_p}->pkt_do('sigpipe_handler') if $self->{pkt_op_p}; x_it($self, 13); } @@ -558,7 +561,8 @@ sub _lei_atfork_child { my ($self, $persist) = @_; # we need to explicitly close things which are on stack if ($persist) { - chdir '/' or die "chdir(/): $!"; + open $self->{3}, '<', '/' or die "open(/) $!"; + fchdir($self); close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)})); if (my $cfg = $self->{cfg}) { delete @$cfg{qw(-lei_store -watches -lei_note_event)}; @@ -566,8 +570,10 @@ sub _lei_atfork_child { } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly open STDERR, '+>&='.fileno($self->{2}) or warn "open $!"; STDERR->autoflush(1); + POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!"; } - close($_) for (grep(defined, delete @$self{qw(3 old_1 au_done)})); + close($_) for (grep(defined, delete @$self{qw(old_1 au_done)})); + delete $self->{-socks}; if (my $op_c = delete $self->{pkt_op_c}) { close(delete $op_c->{sock}); } @@ -577,13 +583,19 @@ sub _lei_atfork_child { close $listener if $listener; undef $listener; $dir_idle->force_close if $dir_idle; + undef $dir_idle; %PATH2CFG = (); $MDIR2CFGPATH = {}; eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush'; undef $errors_log; $quit = \&CORE::exit; - $self->{-eml_noisy} or # only "lei import" sets this atm - $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); + if (!$self->{-eml_noisy}) { # only "lei import" sets this atm + my $cb = $SIG{__WARN__} // \&CORE::warn; + $SIG{__WARN__} = sub { + $cb->(@_) unless PublicInbox::Eml::warn_ignore(@_) + }; + } + $SIG{TERM} = sub { exit(128 + 15) }; $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } @@ -612,17 +624,20 @@ sub incr { sub pkt_ops { my ($lei, $ops) = @_; - $ops->{'!'} = [ \&fail_handler, $lei ]; - $ops->{'|'} = [ \&sigpipe_handler, $lei ]; - $ops->{x_it} = [ \&x_it, $lei ]; - $ops->{child_error} = [ \&child_error, $lei ]; - $ops->{incr} = [ \&incr, $lei ]; + $ops->{fail_handler} = [ $lei ]; + $ops->{sigpipe_handler} = [ $lei ]; + $ops->{x_it} = [ $lei ]; + $ops->{child_error} = [ $lei ]; + $ops->{incr} = [ $lei ]; $ops; } sub workers_start { my ($lei, $wq, $jobs, $ops, $flds) = @_; - $ops = pkt_ops($lei, { ($ops ? %$ops : ()) }); + $ops //= {}; + ($wq->can('net_merge_all_done') && $lei->{auth}) and + $lei->{auth}->op_merge($ops, $wq, $lei); + pkt_ops($lei, $ops); $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; my $end = $lei->pkt_op_pair; my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker"; @@ -632,18 +647,29 @@ sub workers_start { my $op_c = delete $lei->{pkt_op_c}; @$end = (); $lei->event_step_init; + $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); ($op_c, $ops); } # call this when we're ready to wait on events and yield to other clients sub wait_wq_events { my ($lei, $op_c, $ops) = @_; + my $wq1 = $lei->{wq1}; + ($wq1 && $wq1->can('net_merge_all_done') && !$lei->{auth}) and + $wq1->net_merge_all_done; for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs - $wq->wq_close(1); + $wq->wq_close; } $op_c->{ops} = $ops; } +sub wq1_start { + my ($lei, $wq, $jobs) = @_; + my ($op_c, $ops) = workers_start($lei, $wq, $jobs // 1); + $lei->{wq1} = $wq; + wait_wq_events($lei, $op_c, $ops); # net_merge_all_done if !{auth} +} + sub _help { require PublicInbox::LeiHelp; PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC); @@ -772,7 +798,7 @@ sub lazy_cb ($$$) { sub dispatch { my ($self, $cmd, @argv) = @_; - fchdir($self) or return; + fchdir($self); local %ENV = %{$self->{env}}; local $current_lei = $self; # for __WARN__ $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY @@ -792,7 +818,8 @@ sub dispatch { next if $d eq ''; # same as git(1) chdir $d or return fail($self, "cd $d: $!"); } - open $self->{3}, '.' or return fail($self, "open . $!"); + open $self->{3}, '<', '.' or + return fail($self, "open . $!"); } $cb->($self, @argv); } elsif (grep(/\A-/, $cmd, @argv)) { # --help or -h only @@ -841,9 +868,7 @@ sub _lei_cfg ($;$) { } if (scalar(keys %PATH2CFG) > 5) { # FIXME: use inotify/EVFILT_VNODE to detect unlinked configs - for my $k (keys %PATH2CFG) { - delete($PATH2CFG{$k}) unless -f $k - } + delete(@PATH2CFG{grep(!-f, keys %PATH2CFG)}); } $self->{cfg} = $PATH2CFG{$f} = $cfg; refresh_watches($self); @@ -1009,7 +1034,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail $cmd = [ Text::ParseWords::shellwords($cmd) ]; send($sock, exec_buf($cmd, {}), MSG_EOR) if $sock; } else { - err($self, "W: unsupported --alert=$op"); # non-fatal + warn("W: unsupported --alert=$op\n"); # non-fatal } } } @@ -1058,7 +1083,7 @@ sub start_pager { # display a message for user before spawning full-screen $VISUAL sub pgr_err { my ($self, @msg) = @_; - return $self->err(@msg) unless $self->{sock} && -t $self->{2}; + return warn(@msg) unless $self->{sock} && -t $self->{2}; start_pager($self, { LESS => 'RX' }); # no 'F' so we prompt print { $self->{2} } @msg; $self->{2}->autoflush(1); @@ -1103,14 +1128,14 @@ sub accept_dispatch { # Listener {post_accept} callback my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); $self->{env} = \%env; eval { dispatch($self, @argv) }; - send($sock, $@, MSG_EOR) if $@; + $self->fail($@) if $@; } sub dclose { my ($self) = @_; + local $current_lei = $self; delete $self->{-progress}; _drop_wq($self) if $self->{failed}; - close(delete $self->{1}) if $self->{1}; # may reap_compress $self->close if $self->{-event_init_done}; # PublicInbox::DS::close } @@ -1118,23 +1143,31 @@ sub dclose { sub event_step { my ($self) = @_; local %ENV = %{$self->{env}}; - my $sock = $self->{sock}; local $current_lei = $self; eval { - while (my @fds = $recv_cmd->($sock, my $buf, 4096)) { - if (scalar(@fds) == 1 && !defined($fds[0])) { - return if $! == EAGAIN; - next if $! == EINTR; - last if $! == ECONNRESET; - die "recvmsg: $!"; - } - for my $fd (@fds) { - open my $rfh, '+<&=', $fd; + my @fds = $recv_cmd->($self->{sock} // return, my $buf, 4096); + if (scalar(@fds) == 1 && !defined($fds[0])) { + return if $! == EAGAIN; + die "recvmsg: $!" if $! != ECONNRESET; + $buf = ''; + @fds = (); # for open loop below: + } + for (@fds) { open my $rfh, '+<&=', $_ } + if ($buf eq '') { + _drop_wq($self); # EOF, client disconnected + dclose($self); + $buf = 'TERM'; + } + if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) { + my $sig = "-$buf"; + for my $wq (grep(defined, @$self{@WQ_KEYS})) { + $wq->wq_kill($sig); } + } else { die "unrecognized client signal: $buf"; } - _drop_wq($self); # EOF, client disconnected - dclose($self); + my $s = $self->{-socks} // []; # lei up --all + @$s = grep { send($_, $buf, MSG_EOR) } @$s; }; if (my $err = $@) { eval { $self->fail($err) }; @@ -1145,9 +1178,10 @@ sub event_step { sub event_step_init { my ($self) = @_; my $sock = $self->{sock} or return; - $self->{-event_init_done} //= do { # persist til $ops done - $self->SUPER::new($sock, EPOLLIN|EPOLLET); - $sock; + $self->{-event_init_done} // do { # persist til $ops done + $sock->blocking(0); + $self->SUPER::new($sock, EPOLLIN); + $self->{-event_init_done} = $sock; }; } @@ -1172,7 +1206,7 @@ sub cfg2lei ($) { open($lei->{0}, '<&', \*STDIN) or die "dup 0: $!"; open($lei->{1}, '>>&', \*STDOUT) or die "dup 1: $!"; open($lei->{2}, '>>&', \*STDERR) or die "dup 2: $!"; - open($lei->{3}, '/') or die "open /: $!"; + open($lei->{3}, '<', '/') or die "open /: $!"; my ($x, $y); socketpair($x, $y, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; $lei->{sock} = $x; @@ -1186,7 +1220,7 @@ sub dir_idle_handler ($) { # PublicInbox::DirIdle callback my $fn = $ev->fullname; if ($fn =~ m!\A(.+)/(new|cur)/([^/]+)\z!) { # Maildir file my ($mdir, $nc, $bn) = ($1, $2, $3); - $nc = '' if $ev->IN_DELETE; + $nc = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM; for my $f (keys %{$MDIR2CFGPATH->{$mdir} // {}}) { my $cfg = $PATH2CFG{$f} // next; eval { @@ -1194,7 +1228,7 @@ sub dir_idle_handler ($) { # PublicInbox::DirIdle callback $lei->dispatch('note-event', "maildir:$mdir", $nc, $bn, $fn); }; - warn "E note-event $f: $@\n" if $@; + warn "E: note-event $f: $@\n" if $@; } } if ($ev->can('cancel') && ($ev->IN_IGNORE || $ev->IN_UNMOUNT)) { @@ -1219,6 +1253,7 @@ sub lazy_start { $errors_log = "$sock_dir/errors.log"; my $addr = pack_sockaddr_un($path); my $lk = bless { lock_path => $errors_log }, 'PublicInbox::Lock'; + umask(077) // die("umask(077): $!"); $lk->lock_acquire; socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!"; if ($errno == ECONNREFUSED || $errno == ENOENT) { @@ -1230,7 +1265,6 @@ sub lazy_start { $! = $errno; # allow interpolation to stringify in die die "connect($path): $!"; } - umask(077) // die("umask(077): $!"); bind($listener, $addr) or die "bind($path): $!"; $lk->lock_release; undef $lk; @@ -1284,23 +1318,12 @@ sub lazy_start { USR1 => \&noop, USR2 => \&noop, }; - my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK); - local @SIG{keys %$sig} = values(%$sig) unless $sigfd; - undef $sig; - local $SIG{PIPE} = 'IGNORE'; require PublicInbox::DirIdle; - local $dir_idle = PublicInbox::DirIdle->new([$sock_dir], sub { + local $dir_idle = PublicInbox::DirIdle->new(sub { # just rely on wakeup to hit PostLoopCallback set below dir_idle_handler($_[0]) if $_[0]->fullname ne $path; - }, 1); - if ($sigfd) { - undef $sigfd; # unref, already in DS::DescriptorMap - } else { - # wake up every second to accept signals if we don't - # have signalfd or IO::KQueue: - PublicInbox::DS::sig_setmask($oldset); - PublicInbox::DS->SetLoopTimeout(1000); - } + }); + $dir_idle->add_watches([$sock_dir]); PublicInbox::DS->SetPostLoopCallback(sub { my ($dmap, undef) = @_; if (@st = defined($path) ? stat($path) : ()) { @@ -1314,11 +1337,10 @@ sub lazy_start { $quit->(); } return 1 if defined($path); - my $now = now(); my $n = 0; for my $s (values %$dmap) { $s->can('busy') or next; - if ($s->busy($now)) { + if ($s->busy) { ++$n; } else { $s->close; @@ -1337,8 +1359,10 @@ sub lazy_start { open STDERR, '>&STDIN' or die "redirect stderr failed: $!"; open STDOUT, '>&STDIN' or die "redirect stdout failed: $!"; # $daemon pipe to `lei' closed, main loop begins: - eval { PublicInbox::DS->EventLoop }; + eval { PublicInbox::DS::event_loop($sig, $oldset) }; warn "event loop error: $@\n" if $@; + # exit() may trigger waitpid via various DESTROY, ensure interruptible + PublicInbox::DS::sig_setmask($oldset); dump_and_clear_log(); exit($exit_code // 0); } @@ -1364,6 +1388,7 @@ sub DESTROY { sub wq_done_wait { # dwaitpid callback my ($arg, $pid) = @_; my ($wq, $lei) = @$arg; + local $current_lei = $lei; my $err_type = $lei->{-err_type}; $? and $lei->child_error($?, $err_type ? "$err_type errors during $lei->{cmd}" : ()); @@ -1373,13 +1398,13 @@ sub wq_done_wait { # dwaitpid callback sub fchdir { my ($lei) = @_; my $dh = $lei->{3} // die 'BUG: lei->{3} (CWD) gone'; - chdir($dh) || $lei->fail("fchdir: $!"); + chdir($dh) || die "fchdir: $!"; } sub wq_eof { # EOF callback for main daemon my ($lei) = @_; - my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed - $wq1->wq_wait_old(\&wq_done_wait, $lei); + local $current_lei = $lei; + delete $lei->{wq1} // return $lei->fail; # already failed } sub watch_state_ok ($) { @@ -1405,6 +1430,7 @@ sub add_maildir_watch ($$) { sub refresh_watches { my ($lei) = @_; + $dir_idle or return; my $cfg = _lei_cfg($lei) or return; my $old = $cfg->{-watches}; my $watches = $cfg->{-watches} //= {}; @@ -1415,9 +1441,9 @@ sub refresh_watches { require PublicInbox::LeiWatch; $watches->{$url} //= PublicInbox::LeiWatch->new($url); $seen{$url} = undef; - my $state = $cfg->get_1("watch.$url", 'state'); + my $state = $cfg->get_1("watch.$url.state"); if (!watch_state_ok($state)) { - $lei->err("watch.$url.state=$state not supported"); + warn("watch.$url.state=$state not supported\n"); next; } if ($url =~ /\Amaildir:(.+)/i) { @@ -1477,15 +1503,16 @@ sub git_oid { } sub lms { - my ($lei, $rw) = @_; + my ($lei, $creat) = @_; my $sto = $lei->{sto} // _lei_store($lei) // return; require PublicInbox::LeiMailSync; my $f = "$sto->{priv_eidx}->{topdir}/mail_sync.sqlite3"; - (-f $f || $rw) ? PublicInbox::LeiMailSync->new($f) : undef; + (-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef; } sub sto_done_request { my ($lei, $sock) = @_; + local $current_lei = $lei; eval { if ($sock //= $lei->{sock}) { # issue, async wait $lei->{sto}->wq_io_do('done', [ $sock ]); @@ -1493,15 +1520,26 @@ sub sto_done_request { my $wait = $lei->{sto}->wq_do('done'); } }; - $lei->err($@) if $@; + warn($@) if $@; } sub cfg_dump ($$) { my ($lei, $f) = @_; my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei->{2}) }; return $ret if !$@; - $lei->err($@); + warn($@); undef; } +sub request_umask { + my ($lei) = @_; + my $s = $lei->{sock} // return; + send($s, 'umask', MSG_EOR) // die "send: $!"; + vec(my $rvec = '', fileno($s), 1) = 1; + select($rvec, undef, undef, 2) or die 'timeout waiting for umask'; + recv($s, my $v, 5, 0) // die "recv: $!"; + (my $u, $lei->{client_umask}) = unpack('AV', $v); + $u eq 'u' or warn "E: recv $v has no umask"; +} + 1;