]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiOverview.pm
lei_to_mail: prepare for worker offload
[public-inbox.git] / lib / PublicInbox / LeiOverview.pm
index ec0921baf75c69c63102787330b143f26af04488..9846bc8a7ad6b0ef5850c8d0eff6ecce13047c75 100644 (file)
@@ -6,8 +6,11 @@
 package PublicInbox::LeiOverview;
 use strict;
 use v5.10.1;
+use parent qw(PublicInbox::Lock);
 use POSIX qw(strftime);
+use Fcntl qw(F_GETFL O_APPEND);
 use File::Spec;
+use File::Temp ();
 use PublicInbox::MID qw($MID_EXTRACT);
 use PublicInbox::Address qw(pairs);
 use PublicInbox::Config;
@@ -18,40 +21,64 @@ my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
+# we open this in the parent process before ->wq_do handoff
+sub ovv_out_lk_init ($) {
+       my ($self) = @_;
+       $self->{tmp_lk_id} = "$self.$$";
+       my $tmp = File::Temp->new("lei-ovv.dst.$$.lock-XXXXXX",
+                                       TMPDIR => 1, UNLINK => 0);
+       $self->{lock_path} = $tmp->filename;
+}
+
+sub ovv_out_lk_cancel ($) {
+       my ($self) = @_;
+       ($self->{tmp_lk_id}//'') eq "$self.$$" and
+               unlink(delete($self->{lock_path}));
+}
+
 sub new {
        my ($class, $lei) = @_;
        my $opt = $lei->{opt};
-       my $out = $opt->{output} // '-';
-       $out = '/dev/stdout' if $out eq '-';
+       my $dst = $opt->{output} // '-';
+       $dst = '/dev/stdout' if $dst eq '-';
 
        my $fmt = $opt->{'format'};
        $fmt = lc($fmt) if defined $fmt;
-       if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+       if ($dst =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
                my $ofmt = lc $1;
                $fmt //= $ofmt;
                return $lei->fail(<<"") if $fmt ne $ofmt;
 --format=$fmt and --output=$ofmt conflict
 
        }
-       $fmt //= 'json' if $out eq '/dev/stdout';
+       $fmt //= 'json' if $dst eq '/dev/stdout';
        $fmt //= 'maildir'; # TODO
 
-       if (index($out, '://') < 0) { # not a URL, so assume path
-                $out = File::Spec->canonpath($out);
+       if (index($dst, '://') < 0) { # not a URL, so assume path
+                $dst = File::Spec->canonpath($dst);
        } # else URL
 
-       my $self = bless { fmt => $fmt, out => $out }, $class;
+       my $self = bless { fmt => $fmt, dst => $dst }, $class;
        my $json;
        if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
                $json = $self->{json} = ref(PublicInbox::Config->json);
        }
        my ($isatty, $seekable);
-       if ($out eq '/dev/stdout') {
+       if ($dst eq '/dev/stdout') {
                $isatty = -t $lei->{1};
                $lei->start_pager if $isatty;
                $opt->{pretty} //= $isatty;
+               if (!$isatty && -f _) {
+                       my $fl = fcntl($lei->{1}, F_GETFL, 0) //
+                               return $lei->fail("fcntl(stdout): $!");
+                       ovv_out_lk_init($self) unless ($fl & O_APPEND);
+               } else {
+                       ovv_out_lk_init($self);
+               }
        } elsif ($json) {
                return $lei->fail('JSON formats only output to stdout');
+       } else {
+               return $lei->fail("TODO: $dst -f $fmt");
        }
        $self;
 }
@@ -109,6 +136,7 @@ sub _unbless_smsg {
 sub ovv_atexit_child {
        my ($self, $lei) = @_;
        if (my $bref = delete $lei->{ovv_buf}) {
+               my $lk = $self->lock_for_scope;
                print { $lei->{1} } $$bref;
        }
 }
@@ -142,7 +170,9 @@ sub _json_pretty {
 sub ovv_each_smsg_cb {
        my ($self, $lei) = @_;
        $lei->{ovv_buf} = \(my $buf = '');
+       delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
        my $json = $self->{json}->new;
+       $lei->{1}->autoflush(1);
        if ($json) {
                $json->utf8->canonical;
                $json->ascii(1) if $lei->{opt}->{ascii};
@@ -164,6 +194,7 @@ sub ovv_each_smsg_cb {
                        } sort keys %$smsg);
                        $buf .= $EOR;
                        if (length($buf) > 65536) {
+                               my $lk = $self->lock_for_scope;
                                print { $lei->{1} } $buf;
                                $buf = '';
                        }
@@ -175,6 +206,7 @@ sub ovv_each_smsg_cb {
                        delete @$smsg{qw(tid num)};
                        $buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
                        if (length($buf) > 65536) {
+                               my $lk = $self->lock_for_scope;
                                print { $lei->{1} } $buf;
                                $buf = '';
                        }
@@ -186,4 +218,7 @@ sub ovv_each_smsg_cb {
        } # else { ...
 }
 
+no warnings 'once';
+*DESTROY = \&ovv_out_lk_cancel;
+
 1;