.'/lei/config');
}
+sub index_opt {
+ # TODO: drop underscore variants everywhere, they're undocumented
+ qw(fsync|sync! jobs|j=i indexlevel|index-level|L=s compact+
+ max_size|max-size=s sequential_shard|sequential-shard
+ batch_size|batch-size=s skip-docdata quiet|q verbose|v+)
+}
+
# TODO: generate shell completion + help using %CMD and %OPTDESC
# command => [ positional_args, 1-line description, Getopt::Long option spec ]
our %CMD = ( # sorted in order of importance/use:
-'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
+'q' => [ '--stdin|SEARCH_TERMS...', 'search for messages matching terms', qw(
save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
sort|s=s reverse|r offset=i remote! local! external! pretty
- include|I=s@ exclude=s@ only=s@
- mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q
+ include|I=s@ exclude=s@ only=s@ jobs|j=s globoff|g stdin|
+ mua-cmd|mua=s no-torsocks torsocks=s verbose|v+ quiet|q
received-after=s received-before=s sent-after=s sent-since=s),
PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ],
'add-external' => [ 'URL_OR_PATHNAME',
'add/set priority of a publicinbox|extindex for extra matches',
- qw(boost=i quiet|q) ],
+ qw(boost=i c=s@ mirror=s no-torsocks torsocks=s inbox-version=i),
+ index_opt(), PublicInbox::LeiQuery::curl_opt() ],
'ls-external' => [ '[FILTER...]', 'list publicinbox|extindex locations',
qw(format|f=s z|0 local remote quiet|q) ],
'forget-external' => [ 'URL_OR_PATHNAME...|--prune',
'exclude mail matching From: or thread from non-Message-ID searches',
qw(stdin| thread|t from|f=s mid=s oid=s) ],
'mark' => [ 'MESSAGE_FLAGS...',
- 'set/unset flags on message(s) from stdin',
+ 'set/unset keywords on message(s) from stdin',
qw(stdin| oid=s exact by-mid|mid:s) ],
'forget' => [ '[--stdin|--oid=OID|--by-mid=MID]',
"exclude message(s) on stdin from `q' search results",
'add-watch' => [ '[URL_OR_PATHNAME]',
'watch for new messages and flag changes',
- qw(import! flags! interval=s recursive|r exclude=s include=s) ],
+ qw(import! kw|keywords|flags! interval=s recursive|r
+ exclude=s include=s) ],
'ls-watch' => [ '[FILTER...]', 'list active watches with numbers and status',
qw(format|f=s z) ],
'pause-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote) ],
'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
qw(prune) ],
-'import' => [ 'URL_OR_PATHNAME|--stdin',
- 'one-shot import/update from URL or filesystem',
- qw(stdin| offset=i recursive|r exclude=s include=s !flags),
+'import' => [ 'URLS_OR_PATHNAMES...|--stdin',
+ 'one-time import/update from URL or filesystem',
+ qw(stdin| offset=i recursive|r exclude=s include|I=s
+ format|f=s kw|keywords|flags!),
],
'config' => [ '[...]', sub {
# $spec => [@ALLOWED_VALUES (default is first), $description],
# $spec => $description
# "$SUB_COMMAND TAB $spec" => as above
-my $stdin_formats = [ 'IN|auto|raw|mboxrd|mboxcl2|mboxcl|mboxo',
- 'specify message input format' ];
+my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo',
+ 'specify message input format' ];
my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ];
my %OPTDESC = (
'help|h' => 'show this built-in help',
'quiet|q' => 'be quiet',
-'verbose|v' => 'be more verbose',
+'globoff|g' => "do not match locations using '*?' wildcards and '[]' ranges",
+'verbose|v+' => 'be more verbose',
'solve!' => 'do not attempt to reconstruct blobs from emails',
'torsocks=s' => ['auto|no|yes',
'whether or not to wrap git and curl commands with torsocks'],
'q only=s@' => [ 'URL_OR_PATHNAME',
'only use specified external(s) for search' ],
+'q jobs=s' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
+ 'control number of search and writer jobs' ],
+
+'import format|f=s' => $stdin_formats,
+
'ls-query format|f=s' => $ls_format,
'ls-external format|f=s' => $ls_format,
'by-mid|mid:s' => [ 'MID', 'match only by Message-ID, ignoring contents' ],
'jobs:i' => 'set parallelism level',
+'kw|keywords|flags!' => 'disable/enable importing flags',
+
# xargs, env, use "-0", git(1) uses "-z". We support z|0 everywhere
'z|0' => 'use NUL \\0 instead of newline (CR) to delimit lines',
'leistore.dir' => 'top-level storage location',
);
+my @WQ_KEYS = qw(lxs l2m imp mrr); # internal workers
+
# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
sub x_it ($$) {
my ($self, $code) = @_;
# make sure client sees stdout before exit
$self->{1}->autoflush(1) if $self->{1};
dump_and_clear_log();
- if (my $sock = $self->{sock}) {
- send($sock, "x_it $code", MSG_EOR);
- } elsif (!$self->{oneshot}) {
- return; # client disconnected, noop
- } elsif (my $signum = ($code & 127)) { # usually SIGPIPE (13)
- $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work
- kill $signum, $$;
- sleep; # wait for signal
- } else {
+ if (my $s = $self->{pkt_op_p} // $self->{sock}) {
+ send($s, "x_it $code", MSG_EOR);
+ } elsif ($self->{oneshot}) {
# don't want to end up using $? from child processes
- for my $f (qw(lxs l2m)) {
+ for my $f (@WQ_KEYS) {
my $wq = delete $self->{$f} or next;
$wq->DESTROY;
}
- $quit->($code >> 8);
- }
+ # cleanup anything that has tempfiles
+ delete @$self{qw(ovv dedupe)};
+ if (my $signum = ($code & 127)) { # usually SIGPIPE (13)
+ $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work
+ kill $signum, $$;
+ sleep(1) while 1; # wait for signal
+ } else {
+ $quit->($code >> 8);
+ }
+ } # else ignore if client disconnected
}
sub err ($;@) {
my $self = shift;
my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{GLOB};
- my $eor = (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
- print $err @_, $eor and return;
+ my @eor = (substr($_[-1]//'', -1, 1) eq "\n" ? () : ("\n"));
+ print $err @_, @eor and return;
my $old_err = delete $self->{2};
- close($old_err) if $! == EPIPE && $old_err;;
+ close($old_err) if $! == EPIPE && $old_err;
$err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB};
- print $err @_, $eor or print STDERR @_, $eor;
+ print $err @_, @eor or print STDERR @_, @eor;
}
sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
+sub fail_handler ($;$$) {
+ my ($lei, $code, $io) = @_;
+ for my $f (@WQ_KEYS) {
+ my $wq = delete $lei->{$f} or next;
+ $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
+ }
+ close($io) if $io; # needed to avoid warnings on SIGPIPE
+ $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers
+ fail_handler($_[0], 13, delete $_[0]->{1});
+}
+
sub fail ($$;$) {
my ($self, $buf, $exit_code) = @_;
err($self, $buf) if defined $buf;
- send($self->{pkt_op}, '!', MSG_EOR) if $self->{pkt_op}; # fail_handler
+ # calls fail_handler:
+ send($self->{pkt_op_p}, '!', MSG_EOR) if $self->{pkt_op_p};
x_it($self, ($exit_code // 1) << 8);
undef;
}
sub puts ($;@) { out(shift, map { "$_\n" } @_) }
sub child_error { # passes non-fatal curl exit codes to user
- my ($self, $child_error) = @_; # child_error is $?
- if (my $sock = $self->{sock}) { # send to lei(1) client
- send($sock, "child_error $child_error", MSG_EOR);
- } elsif ($self->{oneshot}) {
+ my ($self, $child_error, $msg) = @_; # child_error is $?
+ $self->err($msg) if $msg;
+ if (my $s = $self->{pkt_op_p} // $self->{sock}) {
+ # send to the parent lei-daemon or to lei(1) client
+ send($s, "child_error $child_error", MSG_EOR);
+ } elsif (!$PublicInbox::DS::in_loop) {
$self->{child_error} = $child_error;
} # else noop if client disconnected
}
-sub atfork_prepare_wq {
- my ($self, $wq) = @_;
- my $tcafc = $wq->{-ipc_atfork_child_close} //= [ $listener // () ];
- if (my $sock = $self->{sock}) {
- push @$tcafc, @$self{qw(0 1 2 3)}, $sock;
- }
- if (my $pgr = $self->{pgr}) {
- push @$tcafc, @$pgr[1,2];
- }
- if (my $old_1 = $self->{old_1}) {
- push @$tcafc, $old_1;
- }
- for my $f (qw(lxs l2m)) {
- my $ipc = $self->{$f} or next;
- push @$tcafc, grep { defined }
- @$ipc{qw(-wq_s1 -wq_s2 -ipc_req -ipc_res)};
- }
-}
-
-sub io_restore ($$) {
- my ($dst, $src) = @_;
- for my $i (0..2) { # standard FDs
- my $io = delete $src->{$i} or next;
- $dst->{$i} = $io;
- }
- for my $i (3..9) { # named (non-standard) FDs
- my $io = $src->{$i} or next;
- my @st = stat($io) or die "stat $src.$i ($io): $!";
- my $f = delete $dst->{"dev=$st[0],ino=$st[1]"} // next;
- $dst->{$f} = $io;
- delete $src->{$i};
- }
-}
-
sub note_sigpipe { # triggers sigpipe_handler
my ($self, $fd) = @_;
close(delete($self->{$fd})); # explicit close silences Perl warning
- send($self->{pkt_op}, '|', MSG_EOR) if $self->{pkt_op};
+ send($self->{pkt_op_p}, '|', MSG_EOR) if $self->{pkt_op_p};
x_it($self, 13);
}
-sub atfork_child_wq {
- my ($self, $wq) = @_;
- io_restore($self, $wq);
- -S $self->{pkt_op} or die 'BUG: {pkt_op} expected';
- io_restore($self->{l2m}, $wq);
- %PATH2CFG = ();
- undef $errors_log;
- $quit = \&CORE::exit;
- $current_lei = $self; # for SIG{__WARN__}
-}
-
-sub io_extract ($;@) {
- my ($obj, @fields) = @_;
- my @io;
- for my $f (@fields) {
- my $io = delete $obj->{$f} or next;
- my @st = stat($io) or die "W: stat $obj.$f ($io): $!";
- $obj->{"dev=$st[0],ino=$st[1]"} = $f;
- push @io, $io;
+sub lei_atfork_child {
+ my ($self, $persist) = @_;
+ # we need to explicitly close things which are on stack
+ if ($persist) {
+ my @io = delete @$self{0,1,2};
+ unless ($self->{oneshot}) {
+ close($_) for @io;
+ }
+ } else {
+ delete $self->{0};
}
- @io
-}
-
-# usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
-sub atfork_parent_wq {
- my ($self, $wq) = @_;
- my $env = delete $self->{env}; # env is inherited at fork
- my $lei = bless { %$self }, ref($self);
- for my $f (qw(dedupe ovv)) {
- my $tmp = delete($lei->{$f}) or next;
- $lei->{$f} = $wq->deep_clone($tmp);
+ for (delete @$self{qw(3 sock old_1 au_done)}) {
+ close($_) if defined($_);
}
- $self->{env} = $env;
- delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
- my @io = (delete(@$lei{qw(0 1 2)}),
- io_extract($lei, qw(sock pkt_op startq)));
- my $l2m = $lei->{l2m};
- if ($l2m && $l2m != $wq) { # $wq == lxs
- if (my $wq_s1 = $l2m->{-wq_s1}) {
- push @io, io_extract($l2m, '-wq_s1');
- $l2m->{-wq_s1} = $wq_s1;
- }
- $l2m->wq_close(1);
+ if (my $op_c = delete $self->{pkt_op_c}) {
+ close(delete $op_c->{sock});
+ }
+ if (my $pgr = delete $self->{pgr}) {
+ close($_) for (@$pgr[1,2]);
}
- ($lei, @io);
+ close $listener if $listener;
+ undef $listener;
+ %PATH2CFG = ();
+ undef $errors_log;
+ $quit = \&CORE::exit;
+ $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
}
sub _help ($;$) {
my (@vals, @s, @l);
my $x = $sw;
if ($x =~ s/!\z//) { # solve! => --no-solve
- $x = "no-$x";
+ $x =~ s/(\A|\|)/$1no-/g
} elsif ($x =~ s/:.+//) { # optional args: $x = "mid:s"
@vals = (' [', undef, ']');
} elsif ($x =~ s/=.+//) { # required arg: $x = "type=s"
$msg .= $rhs;
$msg .= "\n";
}
- print { $self->{$errmsg ? 2 : 1} } $msg;
+ my $out = $self->{$errmsg ? 2 : 1};
+ start_pager($self) if -t $out;
+ print $out $msg;
x_it($self, $errmsg ? 1 << 8 : 0); # stderr => failure
undef;
}
} elsif ($var =~ /\A\[-?$POS_ARG\]\z/) { # one optional arg
$i++;
} elsif ($var =~ /\A.+?\|/) { # required FOO|--stdin
+ $inf = 1 if index($var, '...') > 0;
my @or = split(/\|/, $var);
my $ok;
for my $o (@or) {
if ($o =~ /\A--([a-z0-9\-]+)/) {
$ok = defined($OPT->{$1});
- last;
+ last if $ok;
} elsif (defined($argv->[$i])) {
$ok = 1;
$i++;
x_it($self, $?) if $?;
}
+sub lei_import {
+ require PublicInbox::LeiImport;
+ PublicInbox::LeiImport->call(@_);
+}
+
sub lei_init {
my ($self, $dir) = @_;
my $cfg = _lei_cfg($self, 1);
}
puts $self, grep(/$re/, map { # generate short/long names
if (s/[:=].+\z//) { # req/optional args, e.g output|o=i
- } else { # negation: solve! => no-solve|solve
- s/\A(.+)!\z/no-$1|$1/;
+ } elsif (s/\+\z//) { # verbose|v+
+ } elsif (s/!\z//) {
+ # negation: solve! => no-solve|solve
+ s/([\w\-]+)/$1|no-$1/g
}
map {
my $x = length > 1 ? "--$_" : "-$_";
} elsif ($self->{oneshot}) {
$self->{"mua.pid.$self.$$"} = spawn(\@cmd);
}
+ delete $self->{-progress};
}
# caller needs to "-t $self->{1}" to check if tty
sub dclose {
my ($self) = @_;
- for my $f (qw(lxs l2m)) {
+ delete $self->{-progress};
+ for my $f (@WQ_KEYS) {
my $wq = delete $self->{$f} or next;
if ($wq->wq_kill) {
$wq->wq_close