]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: q: lock stdout on overview output
authorEric Wong <e@80x24.org>
Thu, 14 Jan 2021 07:06:24 +0000 (19:06 -1200)
committerEric Wong <e@80x24.org>
Fri, 15 Jan 2021 00:19:07 +0000 (00:19 +0000)
Most writes to stdout aren't atomic and we need locking to
prevent workers from interleaving and corrupting JSON output.
The one case stdout won't require locking is if it's pointed
to a regular file with O_APPEND; as POSIX O_APPEND semantics
guarantees atomicity.

MANIFEST
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Lock.pm
t/lei_overview.t [new file with mode: 0644]

index 2ca240fc90ea317f2e75935a3806dc2e4da531fb..0ebdaccc4c8d23bea760591c0e48ee5b6f209d63 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -338,6 +338,7 @@ t/kqnotify.t
 t/lei-oneshot.t
 t/lei.t
 t/lei_dedupe.t
+t/lei_overview.t
 t/lei_store.t
 t/lei_to_mail.t
 t/lei_xsearch.t
index ec0921baf75c69c63102787330b143f26af04488..ef5f27c1309bd78f697adf558e654b00cf372601 100644 (file)
@@ -6,8 +6,11 @@
 package PublicInbox::LeiOverview;
 use strict;
 use v5.10.1;
+use parent qw(PublicInbox::Lock);
 use POSIX qw(strftime);
+use Fcntl qw(F_GETFL O_APPEND);
 use File::Spec;
+use File::Temp ();
 use PublicInbox::MID qw($MID_EXTRACT);
 use PublicInbox::Address qw(pairs);
 use PublicInbox::Config;
@@ -18,6 +21,21 @@ my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
+# we open this in the parent process before ->wq_do handoff
+sub ovv_out_lk_init ($) {
+       my ($self) = @_;
+       $self->{tmp_lk_id} = "$self.$$";
+       my $tmp = File::Temp->new("lei-ovv.out.$$.lock-XXXXXX",
+                                       TMPDIR => 1, UNLINK => 0);
+       $self->{lock_path} = $tmp->filename;
+}
+
+sub ovv_out_lk_cancel ($) {
+       my ($self) = @_;
+       ($self->{tmp_lk_id}//'') eq "$self.$$" and
+               unlink(delete($self->{lock_path}));
+}
+
 sub new {
        my ($class, $lei) = @_;
        my $opt = $lei->{opt};
@@ -50,8 +68,17 @@ sub new {
                $isatty = -t $lei->{1};
                $lei->start_pager if $isatty;
                $opt->{pretty} //= $isatty;
+               if (!$isatty && -f _) {
+                       my $fl = fcntl($lei->{1}, F_GETFL, 0) //
+                               return $lei->fail("fcntl(stdout): $!");
+                       ovv_out_lk_init($self) unless ($fl & O_APPEND);
+               } else {
+                       ovv_out_lk_init($self);
+               }
        } elsif ($json) {
                return $lei->fail('JSON formats only output to stdout');
+       } else {
+               return $lei->fail("TODO: $out -f $fmt");
        }
        $self;
 }
@@ -109,6 +136,7 @@ sub _unbless_smsg {
 sub ovv_atexit_child {
        my ($self, $lei) = @_;
        if (my $bref = delete $lei->{ovv_buf}) {
+               my $lk = $self->lock_for_scope;
                print { $lei->{1} } $$bref;
        }
 }
@@ -142,7 +170,9 @@ sub _json_pretty {
 sub ovv_each_smsg_cb {
        my ($self, $lei) = @_;
        $lei->{ovv_buf} = \(my $buf = '');
+       delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
        my $json = $self->{json}->new;
+       $lei->{1}->autoflush(1);
        if ($json) {
                $json->utf8->canonical;
                $json->ascii(1) if $lei->{opt}->{ascii};
@@ -164,6 +194,7 @@ sub ovv_each_smsg_cb {
                        } sort keys %$smsg);
                        $buf .= $EOR;
                        if (length($buf) > 65536) {
+                               my $lk = $self->lock_for_scope;
                                print { $lei->{1} } $buf;
                                $buf = '';
                        }
@@ -175,6 +206,7 @@ sub ovv_each_smsg_cb {
                        delete @$smsg{qw(tid num)};
                        $buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
                        if (length($buf) > 65536) {
+                               my $lk = $self->lock_for_scope;
                                print { $lei->{1} } $buf;
                                $buf = '';
                        }
@@ -186,4 +218,7 @@ sub ovv_each_smsg_cb {
        } # else { ...
 }
 
+no warnings 'once';
+*DESTROY = \&ovv_out_lk_cancel;
+
 1;
index 80e7a7f76124d74ce36b868a709370459e5c4a4c..ee93e074b1cacca62055cff1d507399ea5c224c5 100644 (file)
@@ -158,20 +158,21 @@ sub query_done { # PublicInbox::EOFpipe callback
 sub do_query {
        my ($self, $lei_orig, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-
+       my $remotes = $self->{remotes} // [];
        pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
        $io[0] = $qry_done; # don't need stdin
-       $io[1]->autoflush(1);
-       $io[2]->autoflush(1);
+
        if ($lei->{opt}->{thread}) {
+               $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
                for my $ibxish (@$srcs) {
                        $self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
                }
        } else {
+               $lei->{-parallel} = scalar(@$remotes);
                $self->wq_do('query_mset', \@io, $lei, $srcs);
        }
        # TODO
-       for my $rmt (@{$self->{remotes} // []}) {
+       for my $rmt (@$remotes) {
                $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
        }
        @io = ();
index 2c5ebf270a3aed694a9f3c68156f1c431bd69b30..bb213de448c3f6c4ca08a0f8dd7ec34b40420590 100644 (file)
@@ -37,7 +37,7 @@ sub lock_release {
 # caller must use return value
 sub lock_for_scope {
        my ($self, @single_pid) = @_;
-       $self->lock_acquire;
+       lock_acquire($self) or return; # lock_path not set
        PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self);
 }
 
diff --git a/t/lei_overview.t b/t/lei_overview.t
new file mode 100644 (file)
index 0000000..896cc01
--- /dev/null
@@ -0,0 +1,33 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(_exit);
+require_ok 'PublicInbox::LeiOverview';
+
+my $ovv = bless {}, 'PublicInbox::LeiOverview';
+$ovv->ovv_out_lk_init;
+my $lock_path = $ovv->{lock_path};
+ok(-f $lock_path, 'lock init');
+undef $ovv;
+ok(!-f $lock_path, 'lock DESTROY');
+
+$ovv = bless {}, 'PublicInbox::LeiOverview';
+$ovv->ovv_out_lk_init;
+$lock_path = $ovv->{lock_path};
+ok(-f $lock_path, 'lock init #2');
+my $pid = fork // BAIL_OUT "fork $!";
+if ($pid == 0) {
+       undef $ovv;
+       _exit(0);
+}
+is(waitpid($pid, 0), $pid, 'child exited');
+is($?, 0, 'no error in child process');
+ok(-f $lock_path, 'lock was not destroyed by child');
+undef $ovv;
+ok(!-f $lock_path, 'lock DESTROY #2');
+
+done_testing;