use PublicInbox::Sigfd;
use PublicInbox::DS qw(now dwaitpid);
use PublicInbox::Spawn qw(spawn popen_rd);
-use PublicInbox::OnDestroy;
+use PublicInbox::Lock;
use Time::HiRes qw(stat); # ctime comparisons for config cache
use File::Path qw(mkpath);
use File::Spec;
qw(boost=i c=s@ mirror=s no-torsocks torsocks=s inbox-version=i),
qw(quiet|q verbose|v+),
index_opt(), PublicInbox::LeiQuery::curl_opt() ],
-'ls-external' => [ '[FILTER...]', 'list publicinbox|extindex locations',
- qw(format|f=s z|0 local remote quiet|q) ],
+'ls-external' => [ '[FILTER]', 'list publicinbox|extindex locations',
+ qw(format|f=s z|0 globoff|g invert-match|v local remote) ],
'forget-external' => [ 'LOCATION...|--prune',
'exclude further results from a publicinbox|extindex',
qw(prune quiet|q) ],
'show threads|t' => 'display entire thread a message belongs to',
'q threads|t' =>
'return all messages in the same threads as the actual match(es)',
-'alert=s@' => ['CMD,-WINCH,-bell,<any command>',
+'alert=s@' => ['CMD,:WINCH,:bell,<any command>',
'run command(s) or perform ops when done writing to output ' .
- '(default: "-WINCH,-bell" with --mua and Maildir/IMAP output, ' .
+ '(default: ":WINCH,:bell" with --mua and Maildir/IMAP output, ' .
'nothing otherwise)' ],
'augment|a' => 'augment --output destination instead of clobbering',
my $wq = delete $self->{$f} or next;
$wq->DESTROY;
}
- # cleanup anything that has tempfiles
- delete @$self{qw(ovv dedupe)};
+ # cleanup anything that has tempfiles or open file handles
+ %PATH2CFG = ();
+ delete @$self{qw(ovv dedupe sto cfg)};
if (my $signum = ($code & 127)) { # usually SIGPIPE (13)
$SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work
kill $signum, $$;
$wq->wq_wait_old(undef, $lei) if $wq->wq_kill_old; # lei-daemon
}
close($io) if $io; # needed to avoid warnings on SIGPIPE
- $lei->x_it($code // (1 >> 8));
+ x_it($lei, $code // (1 << 8));
}
sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers
undef;
}
+sub check_input_format ($;$) {
+ my ($self, $files) = @_;
+ my $fmt = $self->{opt}->{'format'};
+ if (!$fmt) {
+ my $err = $files ? "regular file(s):\n@$files" : '--stdin';
+ return fail($self, "--format unset for $err");
+ }
+ return 1 if $fmt eq 'eml';
+ # XXX: should this handle {gz,bz2,xz}? that's currently in LeiToMail
+ require PublicInbox::MboxReader;
+ PublicInbox::MboxReader->can($fmt) ||
+ fail($self, "--format=$fmt unrecognized");
+}
+
sub out ($;@) {
my $self = shift;
return if print { $self->{1} // return } @_; # likely
open my $fh, '>>', $f or die "open($f): $!\n";
@st = stat($fh) or die "fstat($f): $!\n";
$cur_st = pack('dd', $st[10], $st[7]);
- qerr($self, "I: $f created") if $self->{cmd} ne 'config';
+ qerr($self, "# $f created") if $self->{cmd} ne 'config';
}
my $cfg = PublicInbox::Config::git_config_dump($f);
+ bless $cfg, 'PublicInbox::Config';
$cfg->{-st} = $cur_st;
$cfg->{'-f'} = $f;
$self->{cfg} = $PATH2CFG{$f} = $cfg;
my @cur = stat($cur) if defined($cur);
$cur = File::Spec->canonpath($cur // $dir);
my @dir = stat($dir);
- my $exists = "I: leistore.dir=$cur already initialized" if @dir;
+ my $exists = "# leistore.dir=$cur already initialized" if @dir;
if (@cur) {
if ($cur eq $dir) {
_lei_store($self, 1)->done;
}
lei_config($self, 'leistore.dir', $dir);
_lei_store($self, 1)->done;
- $exists //= "I: leistore.dir=$dir newly initialized";
+ $exists //= "# leistore.dir=$dir newly initialized";
return qerr($self, $exists);
}
my ($self) = @_;
my $alerts = $self->{opt}->{alert} // return;
while (my $op = shift(@$alerts)) {
- if ($op eq '-WINCH') {
+ if ($op eq ':WINCH') {
# hit the process group that started the MUA
if ($self->{sock}) {
send($self->{sock}, '-WINCH', MSG_EOR);
} elsif ($self->{oneshot}) {
kill('-WINCH', $$);
}
- } elsif ($op eq '-bell') {
+ } elsif ($op eq ':bell') {
out($self, "\a");
} elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
push @$alerts, split(/(?<!\\),/, $op);
chomp(my $pager = <$fh> // '');
close($fh) or warn "`git var PAGER' error: \$?=$?";
return if $pager eq 'cat' || $pager eq '';
- # TODO TIOCGWINSZ
- my $new_env = { LESS => 'FRX', LV => '-c', COLUMNS => 80 };
+ my $new_env = { LESS => 'FRX', LV => '-c' };
$new_env->{MORE} = 'FRX' if $^O eq 'freebsd';
pipe(my ($r, $wpager)) or return warn "pipe: $!";
my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
vec(my $rvec = '', fileno($sock), 1) = 1;
select($rvec, undef, undef, 60) or
return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
- my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
+ # (4096 * 33) >MAX_ARG_STRLEN
+ my @fds = $recv_cmd->($sock, my $buf, 4096 * 33) or return; # EOF
if (scalar(@fds) == 4) {
for my $i (0..3) {
my $fd = shift(@fds);
open($self->{$i}, '+<&=', $fd) and next;
send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR);
}
- } else {
- my $msg = "recv_cmd failed: $!";
- warn $msg;
+ } elsif (!defined($fds[0])) {
+ warn(my $msg = "recv_cmd failed: $!");
return send($sock, $msg, MSG_EOR);
+ } else {
+ return;
}
$self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
# lei(1) calls this when it can't connect
sub lazy_start {
my ($path, $errno, $narg) = @_;
- if ($errno == ECONNREFUSED) {
- unlink($path) or die "unlink($path): $!";
- } elsif ($errno != ENOENT) {
+ local ($errors_log, $listener);
+ ($errors_log) = ($path =~ m!\A(.+?/)[^/]+\z!);
+ $errors_log .= 'errors.log';
+ my $addr = pack_sockaddr_un($path);
+ my $lk = bless { lock_path => $errors_log }, 'PublicInbox::Lock';
+ $lk->lock_acquire;
+ socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
+ if ($errno == ECONNREFUSED || $errno == ENOENT) {
+ return if connect($listener, $addr); # another process won
+ if ($errno == ECONNREFUSED && -S $path) {
+ unlink($path) or die "unlink($path): $!";
+ }
+ } else {
$! = $errno; # allow interpolation to stringify in die
die "connect($path): $!";
}
- if (eval { require BSD::Resource }) {
- my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
- my ($s, $h) = BSD::Resource::getrlimit($NOFILE);
- BSD::Resource::setrlimit($NOFILE, $h, $h) if $s < $h;
- }
umask(077) // die("umask(077): $!");
- local $listener;
- socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
- bind($listener, pack_sockaddr_un($path)) or die "bind($path): $!";
+ bind($listener, $addr) or die "bind($path): $!";
listen($listener, 1024) or die "listen: $!";
+ $lk->lock_release;
+ undef $lk;
my @st = stat($path) or die "stat($path): $!";
my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
local $oldset = PublicInbox::DS::block_signals();
require PublicInbox::Listener;
require PublicInbox::EOFpipe;
(-p STDOUT) or die "E: stdout must be a pipe\n";
- local $errors_log;
- ($errors_log) = ($path =~ m!\A(.+?/)[^/]+\z!);
- $errors_log .= 'errors.log';
open(STDIN, '+>>', $errors_log) or die "open($errors_log): $!";
STDIN->autoflush(1);
dump_and_clear_log("from previous daemon process:\n");
my ($self) = @_;
$self->{1}->autoflush(1) if $self->{1};
stop_pager($self);
+ my $err = $?;
my $oneshot_pids = delete $self->{"pid.$self.$$"} or return;
waitpid($_, 0) for keys %$oneshot_pids;
+ $? = $err if $err; # preserve ->fail or ->x_it code
}
1;