X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLEI.pm;h=5d701d5e14e817ac0b5120eb5ed10bab28608c05;hb=cb0e9d42b799c7489c3b8328cfcae1e1500bc7a0;hp=effc905ac4d323bdf926d57ac538fd49b3f56bee;hpb=787cbc523c0beac69f6df8b8a689684864a6594e;p=public-inbox.git diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index effc905a..5d701d5e 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -23,6 +23,7 @@ use PublicInbox::Sigfd; use PublicInbox::DS qw(now dwaitpid); use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::Lock; +use PublicInbox::Eml; use Time::HiRes qw(stat); # ctime comparisons for config cache use File::Path qw(mkpath); use File::Spec; @@ -143,12 +144,20 @@ our %CMD = ( # sorted in order of importance/use: @lxs_opt, qw(save output|mfolder|o=s format|f=s dedupe|d=s threads|t+ sort|s=s reverse|r offset=i pretty jobs|j=s globoff|g augment|a - import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+), @c_opt, - opt_dash('limit|n=i', '[0-9]+') ], + import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+ + color!), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], 'up' => [ 'OUTPUT|--all', 'update saved search', qw(jobs|j=s lock=s@ alert=s@ mua=s verbose|v+ all:s), @c_opt ], +'lcat' => [ '--stdin|MSGID_OR_URL..', 'display local copy of message(s)', + 'stdin|', # /|\z/ must be first for lone dash + # some of these options are ridiculous for lcat + @lxs_opt, qw(output|mfolder|o=s format|f=s dedupe|d=s threads|t+ + sort|s=s reverse|r offset=i jobs|j=s globoff|g augment|a + import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+ + color!), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], + 'blob' => [ 'OID', 'show a git blob, reconstructing from mail if necessary', qw(git-dir=s@ cwd! verbose|v+ mail! oid-a|A=s path-a|a=s path-b|b=s), @lxs_opt, @c_opt ], @@ -161,7 +170,7 @@ our %CMD = ( # sorted in order of importance/use: 'ls-external' => [ '[FILTER]', 'list publicinbox|extindex locations', qw(format|f=s z|0 globoff|g invert-match|v local remote), @c_opt ], 'ls-label' => [ '', 'list labels', qw(z|0 stats:s), @c_opt ], -'ls-sync' => [ '', 'list sync folders', +'ls-mail-sync' => [ '', 'list mail sync folders', qw(z|0 z|0 globoff|g invert-match|v local remote), @c_opt ], 'forget-external' => [ 'LOCATION...|--prune', 'exclude further results from a publicinbox|extindex', @@ -179,7 +188,8 @@ our %CMD = ( # sorted in order of importance/use: qw(stdin| threads|t from|f=s mid=s oid=s), @c_opt ], 'tag' => [ 'KEYWORDS...', 'set/unset keywords and/or labels on message(s)', - qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@), @c_opt, + qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@), + qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt, pass_through('-kw:foo for delete') ], 'forget' => [ '[--stdin|--oid=OID|--by-mid=MID]', "exclude message(s) on stdin from `q' search results", @@ -202,11 +212,12 @@ our %CMD = ( # sorted in order of importance/use: 'import' => [ 'LOCATION...|--stdin', 'one-time import/update from URL or filesystem', qw(stdin| offset=i recursive|r exclude=s include|I=s - lock=s@ in-format|F=s kw! verbose|v+ incremental! sync!), @c_opt ], + lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!), + qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ], 'convert' => [ 'LOCATION...|--stdin', 'one-time conversion from URL or filesystem to another format', - qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s - lock=s@ kw!), @c_opt ], + qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw!), + qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ], 'p2q' => [ 'FILE|COMMIT_OID|--stdin', "use a patch to generate a query for `lei q --stdin'", qw(stdin| want|w=s@ uri debug), @c_opt ], @@ -259,6 +270,7 @@ my %OPTDESC = ( 'incremental! import' => 'import already seen IMAP and NNTP articles', 'globoff|g' => "do not match locations using '*?' wildcards ". "and\xa0'[]'\x{a0}ranges", +'color!' => 'disable color (for --format=text)', 'verbose|v+' => 'be more verbose', 'external!' => 'do not use externals', 'mail!' => 'do not look in mail storage for OID', @@ -267,6 +279,8 @@ my %OPTDESC = ( 'path-a|a=s' => 'pre-image pathname associated with OID', 'path-b|b=s' => 'post-image pathname associated with OID', 'git-dir=s@' => 'additional git repository to scan', +'proxy=s' => [ 'PROTO://HOST[:PORT]', # shared with curl(1) + "proxy for (e.g. `socks5h://0:9050')" ], 'torsocks=s' => ['VAL|auto|no|yes', 'whether or not to wrap git and curl commands with torsocks'], 'no-torsocks' => 'alias for --torsocks=no', @@ -373,11 +387,18 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m imp mrr cnv p2q tag sol lsss); # internal workers +my @WQ_KEYS = qw(lxs l2m wq1); # internal workers sub _drop_wq { my ($self) = @_; - for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) { $wq->DESTROY } + for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) { + if ($wq->wq_kill) { + $wq->wq_close(0, undef, $self); + } elsif ($wq->wq_kill_old) { + $wq->wq_wait_old(undef, $self); + } + $wq->DESTROY; + } } # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE @@ -477,19 +498,15 @@ sub _lei_atfork_child { # we need to explicitly close things which are on stack if ($persist) { chdir '/' or die "chdir(/): $!"; - my @io = delete @$self{qw(0 1 2 sock)}; - unless ($self->{oneshot}) { - close($_) for @io; - } + close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)})); if (my $cfg = $self->{cfg}) { delete $cfg->{-lei_store}; } } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly open STDERR, '+>&='.fileno($self->{2}) or warn "open $!"; + STDERR->autoflush(1); } - for (delete @$self{qw(3 old_1 au_done)}) { - close($_) if defined($_); - } + close($_) for (grep(defined, delete @$self{qw(3 old_1 au_done)})); if (my $op_c = delete $self->{pkt_op_c}) { close(delete $op_c->{sock}); } @@ -501,6 +518,8 @@ sub _lei_atfork_child { %PATH2CFG = (); undef $errors_log; $quit = \&CORE::exit; + $self->{-eml_noisy} or # only "lei import" sets this atm + $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } @@ -531,7 +550,7 @@ sub workers_start { 'child_error' => [ \&child_error, $lei ], ($ops ? %$ops : ()), }; - $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&dclose, $lei ]; + $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; my $end = $lei->pkt_op_pair; $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); delete $lei->{pkt_op_p}; @@ -662,6 +681,7 @@ sub lazy_cb ($$$) { sub dispatch { my ($self, $cmd, @argv) = @_; local $current_lei = $self; # for __WARN__ + $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY dump_and_clear_log("from previous run\n"); return _help($self, 'no command given') unless defined($cmd); # do not support Getopt bundling for this @@ -990,9 +1010,8 @@ sub accept_dispatch { # Listener {post_accept} callback open($self->{$i++}, '+<&=', $fd) and next; send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR); } - return if scalar(@fds) != 4; + $i == 4 or return send($sock, 'not enough FDs='.($i-1), MSG_EOR) } - $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV); # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0"; substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z// @@ -1201,13 +1220,8 @@ sub oneshot { local $quit = $exit if $exit; local %PATH2CFG; umask(077) // die("umask(077): $!"); - my $self = bless { - oneshot => 1, - 0 => *STDIN{GLOB}, - 1 => *STDOUT{GLOB}, - 2 => *STDERR{GLOB}, - env => \%ENV - }, __PACKAGE__; + my $self = bless { oneshot => 1, env => \%ENV }, __PACKAGE__; + for (0..2) { open($self->{$_}, '+<&=', $_) or die "open fd=$_: $!" } dispatch($self, @ARGV); x_it($self, $self->{child_error}) if $self->{child_error}; } @@ -1226,9 +1240,17 @@ sub DESTROY { sub wq_done_wait { # dwaitpid callback my ($arg, $pid) = @_; - my ($wq, $lei, $e) = @$arg; - $? and $lei->child_error($?, $e ? "$e errors during $lei->{cmd}" : ()); + my ($wq, $lei) = @$arg; + my $err_type = $lei->{-err_type}; + $? and $lei->child_error($?, + $err_type ? "$err_type errors during $lei->{cmd}" : ()); $lei->dclose; } +sub wq_eof { # EOF callback for main daemon + my ($lei) = @_; + my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed + $wq1->wq_wait_old(\&wq_done_wait, $lei); +} + 1;