]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei: generalize auxiliary WQ handling
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index 760f9718ce7bfda89bad1265873c694b3530211a..beb955bb6de599a5ce675276888f17f6f0c83495 100644 (file)
@@ -9,7 +9,6 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
 use PublicInbox::DS qw(now);
-use PublicInbox::PktOp qw(pkt_do);
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
@@ -67,17 +66,23 @@ sub remotes { @{$_[0]->{remotes} // []} }
 # called by PublicInbox::Search::xdb (usually via ->mset)
 sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} }
 
-sub mitem_kw ($$;$) {
-       my ($smsg, $mitem, $flagged) = @_;
-       my $kw = xap_terms('K', my $doc = $mitem->get_document);
+sub _mitem_kw { # retry_reopen callback
+       my ($srch, $smsg, $mitem, $flagged) = @_;
+       my $doc = $mitem->get_document;
+       my $kw = xap_terms('K', $doc);
        $kw->{flagged} = 1 if $flagged;
+       my $L = xap_terms('L', $doc);
        # we keep the empty {kw} array here to prevent expensive work in
        # ->xsmsg_vmd, _unbless_smsg will clobber it iff it's empty
        $smsg->{kw} = [ sort keys %$kw ];
-       my $L = xap_terms('L', $doc);
        $smsg->{L} = [ sort keys %$L ] if scalar(keys %$L);
 }
 
+sub mitem_kw ($$$;$) {
+       my ($srch, $smsg, $mitem, $flagged) = @_;
+       $srch->retry_reopen(\&_mitem_kw, $smsg, $mitem, $flagged);
+}
+
 # like over->get_art
 sub smsg_for {
        my ($self, $mitem) = @_;
@@ -90,7 +95,7 @@ sub smsg_for {
        my $smsg = $ibx->over->get_art($num);
        return if $smsg->{bytes} == 0; # external message
        if ($ibx->can('msg_keywords')) {
-               mitem_kw($smsg, $mitem);
+               mitem_kw($self, $smsg, $mitem);
        }
        $smsg;
 }
@@ -136,7 +141,7 @@ sub mset_progress {
        my $lei = shift;
        return if $lei->{early_mua} || !$lei->{-progress};
        if ($lei->{pkt_op_p}) {
-               pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
+               $lei->{pkt_op_p}->pkt_do('mset_progress', @_);
        } else { # single lei-daemon consumer
                my ($desc, $mset_size, $mset_total_est) = @_;
                $lei->{-mset_total} += $mset_size if $mset_total_est ne '?';
@@ -194,7 +199,8 @@ sub query_one_mset { # for --threads and l2m w/o sort
                                        my $mitem = delete $n2item{$n};
                                        next if $smsg->{bytes} == 0;
                                        if ($mitem && $can_kw) {
-                                               mitem_kw($smsg, $mitem, $fl);
+                                               mitem_kw($srch, $smsg, $mitem,
+                                                       $fl);
                                        } elsif ($mitem && $fl) {
                                                # call ->xsmsg_vmd, later
                                                $smsg->{lei_q_tt_flagged} = 1;
@@ -210,7 +216,7 @@ sub query_one_mset { # for --threads and l2m w/o sort
                                my $mitem = $items[$i++];
                                my $smsg = $over->get_art($n) or next;
                                next if $smsg->{bytes} == 0;
-                               mitem_kw($smsg, $mitem, $fl) if $can_kw;
+                               mitem_kw($srch, $smsg, $mitem, $fl) if $can_kw;
                                $each_smsg->($smsg, $mitem);
                        }
                }
@@ -249,6 +255,14 @@ sub query_combined_mset { # non-parallel for non-"--threads" users
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
+sub _smsg_fill ($$) {
+       my ($smsg, $eml) = @_;
+       $smsg->populate($eml);
+       $smsg->parse_references($eml, mids($eml));
+       $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
+       delete @$smsg{qw(From Subject -ds -ts)};
+}
+
 sub each_remote_eml { # callback for MboxReader->mboxrd
        my ($eml, $self, $lei, $each_smsg) = @_;
        my $xoids = $lei->{ale}->xoids_for($eml, 1);
@@ -258,10 +272,7 @@ sub each_remote_eml { # callback for MboxReader->mboxrd
        my $smsg = bless {}, 'PublicInbox::Smsg';
        $smsg->{blob} = $xoids ? (keys(%$xoids))[0]
                                : git_sha(1, $eml)->hexdigest;
-       $smsg->populate($eml);
-       $smsg->parse_references($eml, mids($eml));
-       $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
-       delete @$smsg{qw(From Subject -ds -ts)};
+       _smsg_fill($smsg, $eml);
        wait_startq($lei);
        if ($lei->{-progress}) {
                ++$lei->{-nr_remote_eml};
@@ -361,6 +372,7 @@ sub query_done { # EOF callback for main daemon
                warn "BUG: {sto} missing with --mail-sync";
        }
        my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef;
+       $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
        $lei->{ovv}->ovv_end($lei);
        my $start_mua;
        if ($l2m) { # close() calls LeiToMail reap_compress
@@ -445,6 +457,9 @@ sub start_query ($;$) { # always runs in main (lei-daemon) process
        for my $uris (@$q) {
                $self->wq_io_do('query_remote_mboxrd', [], $uris);
        }
+       if ($self->{-do_lcat}) {
+               $self->wq_io_do('lcat_dump', []);
+       }
        $self->wq_close(1); # lei_xsearch workers stop when done
 }
 
@@ -510,13 +525,14 @@ sub do_query {
        @$end = ();
        $self->{opt_threads} = $lei->{opt}->{threads};
        $self->{opt_sort} = $lei->{opt}->{'sort'};
+       $self->{-do_lcat} = $lei->{lcat_blob} // $lei->{lcat_fid};
        if ($l2m) {
                $l2m->net_merge_all_done unless $lei->{auth};
        } else {
                start_query($self);
        }
        $lei->event_step_init; # wait for shutdowns
-       $op_c->op_wait_event($ops);
+       $lei->wait_wq_events($op_c, $ops);
 }
 
 sub add_uri {
@@ -553,5 +569,48 @@ sub prepare_external {
        push @{$self->{locals}}, $loc;
 }
 
+sub _lcat_i { # LeiMailSync->each_src iterator callback
+       my ($oidbin, $id, $each_smsg) = @_;
+       $each_smsg->({blob => unpack('H*', $oidbin), pct => 100});
+}
+
+sub _lcat2smsg { # git->cat_async callback
+       my ($bref, $oid, $type, $size, $smsg) = @_;
+       if ($bref) {
+               my $eml = PublicInbox::Eml->new($bref);
+               my $json_dump = delete $smsg->{-json_dump};
+               bless $smsg, 'PublicInbox::Smsg';
+               _smsg_fill($smsg, $eml);
+               $json_dump->($smsg, undef, $eml);
+       }
+}
+
+sub lcat_dump {
+       my ($self) = @_;
+       my $lei = $self->{lei};
+       my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+       my $git = $lei->{ale}->git;
+       if (!$lei->{l2m}) {
+               my $json_dump = $each_smsg;
+               $each_smsg = sub {
+                       my ($smsg) = @_;
+                       use Data::Dumper;
+                       $smsg->{-json_dump} = $json_dump;
+                       $git->cat_async($smsg->{blob}, \&_lcat2smsg, $smsg);
+               };
+       }
+       for my $oid (@{$lei->{lcat_blob} // []}) {
+               $each_smsg->({ blob => $oid, pct => 100 });
+       }
+       if (my $fids = delete $lei->{lcat_fid}) {
+               my $lms = $lei->{lse}->lms;
+               for my $fid (@$fids) {
+                       $lms->each_src({fid => $fid}, \&_lcat_i, $each_smsg);
+               }
+       }
+       $git->async_wait_all;
+       undef $each_smsg; # may commit
+       $lei->{ovv}->ovv_atexit_child($lei);
+}
 
 1;