t/lei-oneshot.t
t/lei.t
t/lei_dedupe.t
+t/lei_overview.t
t/lei_store.t
t/lei_to_mail.t
t/lei_xsearch.t
package PublicInbox::LeiOverview;
use strict;
use v5.10.1;
+use parent qw(PublicInbox::Lock);
use POSIX qw(strftime);
+use Fcntl qw(F_GETFL O_APPEND);
use File::Spec;
+use File::Temp ();
use PublicInbox::MID qw($MID_EXTRACT);
use PublicInbox::Address qw(pairs);
use PublicInbox::Config;
sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
+# we open this in the parent process before ->wq_do handoff
+sub ovv_out_lk_init ($) {
+ my ($self) = @_;
+ $self->{tmp_lk_id} = "$self.$$";
+ my $tmp = File::Temp->new("lei-ovv.out.$$.lock-XXXXXX",
+ TMPDIR => 1, UNLINK => 0);
+ $self->{lock_path} = $tmp->filename;
+}
+
+sub ovv_out_lk_cancel ($) {
+ my ($self) = @_;
+ ($self->{tmp_lk_id}//'') eq "$self.$$" and
+ unlink(delete($self->{lock_path}));
+}
+
sub new {
my ($class, $lei) = @_;
my $opt = $lei->{opt};
$isatty = -t $lei->{1};
$lei->start_pager if $isatty;
$opt->{pretty} //= $isatty;
+ if (!$isatty && -f _) {
+ my $fl = fcntl($lei->{1}, F_GETFL, 0) //
+ return $lei->fail("fcntl(stdout): $!");
+ ovv_out_lk_init($self) unless ($fl & O_APPEND);
+ } else {
+ ovv_out_lk_init($self);
+ }
} elsif ($json) {
return $lei->fail('JSON formats only output to stdout');
+ } else {
+ return $lei->fail("TODO: $out -f $fmt");
}
$self;
}
sub ovv_atexit_child {
my ($self, $lei) = @_;
if (my $bref = delete $lei->{ovv_buf}) {
+ my $lk = $self->lock_for_scope;
print { $lei->{1} } $$bref;
}
}
sub ovv_each_smsg_cb {
my ($self, $lei) = @_;
$lei->{ovv_buf} = \(my $buf = '');
+ delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
my $json = $self->{json}->new;
+ $lei->{1}->autoflush(1);
if ($json) {
$json->utf8->canonical;
$json->ascii(1) if $lei->{opt}->{ascii};
} sort keys %$smsg);
$buf .= $EOR;
if (length($buf) > 65536) {
+ my $lk = $self->lock_for_scope;
print { $lei->{1} } $buf;
$buf = '';
}
delete @$smsg{qw(tid num)};
$buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
if (length($buf) > 65536) {
+ my $lk = $self->lock_for_scope;
print { $lei->{1} } $buf;
$buf = '';
}
} # else { ...
}
+no warnings 'once';
+*DESTROY = \&ovv_out_lk_cancel;
+
1;
sub do_query {
my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-
+ my $remotes = $self->{remotes} // [];
pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
$io[0] = $qry_done; # don't need stdin
- $io[1]->autoflush(1);
- $io[2]->autoflush(1);
+
if ($lei->{opt}->{thread}) {
+ $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
for my $ibxish (@$srcs) {
$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
}
} else {
+ $lei->{-parallel} = scalar(@$remotes);
$self->wq_do('query_mset', \@io, $lei, $srcs);
}
# TODO
- for my $rmt (@{$self->{remotes} // []}) {
+ for my $rmt (@$remotes) {
$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
}
@io = ();
# caller must use return value
sub lock_for_scope {
my ($self, @single_pid) = @_;
- $self->lock_acquire;
+ lock_acquire($self) or return; # lock_path not set
PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self);
}
--- /dev/null
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(_exit);
+require_ok 'PublicInbox::LeiOverview';
+
+my $ovv = bless {}, 'PublicInbox::LeiOverview';
+$ovv->ovv_out_lk_init;
+my $lock_path = $ovv->{lock_path};
+ok(-f $lock_path, 'lock init');
+undef $ovv;
+ok(!-f $lock_path, 'lock DESTROY');
+
+$ovv = bless {}, 'PublicInbox::LeiOverview';
+$ovv->ovv_out_lk_init;
+$lock_path = $ovv->{lock_path};
+ok(-f $lock_path, 'lock init #2');
+my $pid = fork // BAIL_OUT "fork $!";
+if ($pid == 0) {
+ undef $ovv;
+ _exit(0);
+}
+is(waitpid($pid, 0), $pid, 'child exited');
+is($?, 0, 'no error in child process');
+ok(-f $lock_path, 'lock was not destroyed by child');
+undef $ovv;
+ok(!-f $lock_path, 'lock DESTROY #2');
+
+done_testing;