]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei lcat: use single queue for ordering
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index 3414e87dd41ec917021bd017f96efeda1cb39b0a..3ce8f32d6d6acef4a01b79e26305c17f609d9210 100644 (file)
@@ -18,6 +18,7 @@ use PublicInbox::Smsg;
 use PublicInbox::Eml;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 use PublicInbox::ContentHash qw(git_sha);
+use POSIX qw(strftime);
 
 sub new {
        my ($class) = @_;
@@ -32,7 +33,7 @@ sub attach_external {
        my ($self, $ibxish) = @_; # ibxish = ExtSearch or Inbox
        my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
        my $srch = $ibxish->search or
-               return warn("$desc not indexed for Xapian\n");
+               return warn("$desc not indexed for Xapian ($@ $!)\n");
        my @shards = $srch->xdb_shards_flat or
                return warn("$desc has no Xapian shards\n");
 
@@ -161,14 +162,14 @@ sub query_one_mset { # for --threads and l2m w/o sort
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
        my $dir = $ibxish->{inboxdir} // $ibxish->{topdir};
        return warn("$dir not indexed by Xapian\n") unless ($srch && $over);
+       bless $srch, 'PublicInbox::LeiSearch'; # for ->qparse_new
        my $mo = { %{$lei->{mset_opt}} }; # copy
        my $mset;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        my $can_kw = !!$ibxish->can('msg_keywords');
        my $threads = $lei->{opt}->{threads} // 0;
        my $fl = $threads > 1 ? 1 : undef;
-       my $lss = $lei->{dedupe};
-       $lss = undef unless $lss && $lss->can('cfg_set'); # saved search
+       my $lss = $lei->{lss};
        my $maxk = "external.$dir.maxuid";
        my $stop_at = $lss ? $lss->{-cfg}->{$maxk} : undef;
        if (defined $stop_at) {
@@ -266,12 +267,16 @@ sub _smsg_fill ($$) {
 sub each_remote_eml { # callback for MboxReader->mboxrd
        my ($eml, $self, $lei, $each_smsg) = @_;
        my $xoids = $lei->{ale}->xoids_for($eml, 1);
+       my $smsg = bless {}, 'PublicInbox::Smsg';
        if ($self->{import_sto} && !$xoids) {
-               $self->{import_sto}->ipc_do('add_eml', $eml);
+               my $res = $self->{import_sto}->wq_do('add_eml', $eml);
+               if (ref($res) eq ref($smsg)) { # totally new message
+                       $smsg = $res;
+                       $smsg->{kw} = []; # short-circuit xsmsg_vmd
+               }
        }
-       my $smsg = bless {}, 'PublicInbox::Smsg';
-       $smsg->{blob} = $xoids ? (keys(%$xoids))[0]
-                               : git_sha(1, $eml)->hexdigest;
+       $smsg->{blob} //= $xoids ? (keys(%$xoids))[0]
+                               : $lei->git_oid($eml)->hexdigest;
        _smsg_fill($smsg, $eml);
        wait_startq($lei);
        if ($lei->{-progress}) {
@@ -287,13 +292,46 @@ sub each_remote_eml { # callback for MboxReader->mboxrd
        $each_smsg->($smsg, undef, $eml);
 }
 
+sub fudge_qstr_time ($$$) {
+       my ($lei, $uri, $qstr) = @_;
+       return ($qstr, undef) unless $lei->{lss};
+       my $cfg = $lei->{lss}->{-cfg} // die 'BUG: no lss->{-cfg}';
+       my $cfg_key = "external.$uri.lastresult";
+       my $lr = $cfg->{$cfg_key} or return ($qstr, $cfg_key);
+       if ($lr !~ /\A\-?[0-9]+\z/) {
+               $lei->child_error(0,
+                       "$cfg->{-f}: $cfg_key=$lr not an integer, ignoring");
+               return ($qstr, $cfg_key);
+       }
+       my $rft = $lei->{opt}->{'remote-fudge-time'};
+       if ($rft && $rft !~ /\A-?[0-9]+\z/) {
+               my @t = $lei->{lss}->git->date_parse($rft);
+               my $diff = time - $t[0];
+               $lei->qerr("# $rft => $diff seconds");
+               $rft = $diff;
+       }
+       $lr -= ($rft || (48 * 60 * 60));
+       $lei->qerr("# $uri limiting to ".
+               strftime('%Y-%m-%d %k:%M', gmtime($lr)). ' and newer');
+       # this should really be rt: (received-time), but no stable
+       # public-inbox releases support it, yet.
+       my $dt = 'dt:'.strftime('%Y%m%d%H%M%S', gmtime($lr)).'..';
+       if ($qstr =~ /\S/) {
+               substr($qstr, 0, 0, '(');
+               $qstr .= ') AND ';
+       }
+       ($qstr .= $dt, $cfg_key);
+}
+
 sub query_remote_mboxrd {
        my ($self, $uris) = @_;
        local $0 = "$0 query_remote_mboxrd";
        local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
        my $lei = $self->{lei};
        my $opt = $lei->{opt};
-       my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
+       my $qstr = $lei->{mset_opt}->{qstr};
+       $qstr =~ s/[ \n\t]+/ /sg; # make URLs less ugly
+       my @qform = (x => 'm');
        push(@qform, t => 1) if $opt->{threads};
        my $verbose = $opt->{verbose};
        my ($reap_tail, $reap_curl);
@@ -316,7 +354,9 @@ sub query_remote_mboxrd {
        for my $uri (@$uris) {
                $lei->{-current_url} = $uri->as_string;
                $lei->{-nr_remote_eml} = 0;
-               $uri->query_form(@qform);
+               my $start = time;
+               my ($q, $key) = fudge_qstr_time($lei, $uri, $qstr);
+               $uri->query_form(@qform, q => $q);
                my $cmd = $curl->for_uri($lei, $uri);
                $lei->qerr("# $cmd");
                my ($fh, $pid) = popen_rd($cmd, undef, $rdr);
@@ -329,10 +369,11 @@ sub query_remote_mboxrd {
                @$reap_curl = (); # cancel OnDestroy
                die $err if $err;
                my $nr = $lei->{-nr_remote_eml};
-               if ($nr && $lei->{sto}) {
-                       my $wait = $lei->{sto}->ipc_do('done');
-               }
+               my $wait = $lei->{sto}->wq_do('done') if $nr && $lei->{sto};
                if ($? == 0) {
+                       # don't update if no results, maybe MTA is down
+                       $key && $nr and
+                               $lei->{lss}->cfg_set($key, $start);
                        mset_progress($lei, $lei->{-current_url}, $nr, $nr);
                        next;
                }
@@ -346,7 +387,7 @@ sub query_remote_mboxrd {
                                        $lei->err("truncate($cmd stderr): $!");
                }
                next if (($? >> 8) == 22 && $err =~ /\b404\b/);
-               $uri->query_form(q => $lei->{mset_opt}->{qstr});
+               $uri->query_form(q => $qstr);
                $lei->child_error($?, "E: <$uri> $err");
        }
        undef $each_smsg;
@@ -371,8 +412,8 @@ sub query_done { # EOF callback for main daemon
        if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) {
                warn "BUG: {sto} missing with --mail-sync";
        }
-       my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef;
-       $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
+       $lei->sto_done_request if $lei->{sto};
+       my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
        $lei->{ovv}->ovv_end($lei);
        my $start_mua;
        if ($l2m) { # close() calls LeiToMail reap_compress
@@ -395,9 +436,13 @@ Error closing $lei->{ovv}->{dst}: $!
        if ($lei->{-progress}) {
                my $tot = $lei->{-mset_total} // 0;
                my $nr = $lei->{-nr_write} // 0;
-               $lei->qerr($l2m ?
-                       "# $nr written to $lei->{ovv}->{dst} ($tot matches)" :
-                       "# $tot matches");
+               if ($l2m) {
+                       my $m = "# $nr written to " .
+                               "$lei->{ovv}->{dst} ($tot matches)";
+                       $nr ? $lei->qfin($m) : $lei->qerr($m);
+               } else {
+                       $lei->qerr("# $tot matches");
+               }
        }
        $lei->start_mua if $start_mua;
        $lei->dclose;
@@ -525,7 +570,7 @@ sub do_query {
        @$end = ();
        $self->{opt_threads} = $lei->{opt}->{threads};
        $self->{opt_sort} = $lei->{opt}->{'sort'};
-       $self->{-do_lcat} = $lei->{lcat_blob} // $lei->{lcat_fid};
+       $self->{-do_lcat} = !!(delete $lei->{lcat_todo});
        if ($l2m) {
                $l2m->net_merge_all_done unless $lei->{auth};
        } else {
@@ -562,8 +607,11 @@ sub prepare_external {
                die "`\\n' not allowed in `$loc'\n" if index($loc, "\n") >= 0;
                require PublicInbox::Inbox; # v2, v1
                $loc = bless { inboxdir => $loc }, 'PublicInbox::Inbox';
+       } elsif (!-e $loc) {
+               warn "W: $loc gone, perhaps run: lei forget-external $loc\n";
+               return;
        } else {
-               warn "W: ignoring $loc, unable to determine type\n";
+               warn "W: $loc ignored, unable to determine external type\n";
                return;
        }
        push @{$self->{locals}}, $loc;
@@ -585,7 +633,7 @@ sub _lcat2smsg { # git->cat_async callback
        }
 }
 
-sub lcat_dump {
+sub lcat_dump { # via wq_io_do
        my ($self) = @_;
        my $lei = $self->{lei};
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
@@ -594,18 +642,17 @@ sub lcat_dump {
                my $json_dump = $each_smsg;
                $each_smsg = sub {
                        my ($smsg) = @_;
-                       use Data::Dumper;
                        $smsg->{-json_dump} = $json_dump;
                        $git->cat_async($smsg->{blob}, \&_lcat2smsg, $smsg);
                };
        }
-       for my $oid (@{$lei->{lcat_blob} // []}) {
-               $each_smsg->({ blob => $oid, pct => 100 });
-       }
-       if (my $fids = delete $lei->{lcat_fid}) {
-               my $lms = $lei->{lse}->lms;
-               for my $fid (@$fids) {
-                       $lms->each_src({fid => $fid}, \&_lcat_i, $each_smsg);
+       my $lms;
+       for my $ent (@{$lei->{lcat_todo}}) {
+               if (ref $ent eq 'HASH') { # { fid => $fid ,.. }
+                       $lms //= $lei->{lse}->lms;
+                       $lms->each_src($ent, \&_lcat_i, $each_smsg);
+               } else { # oidhex
+                       $each_smsg->({ blob => $ent, pct => 100 });
                }
        }
        $git->async_wait_all;