]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/LeiXSearch.pm
lei: fix inadvertant FD sharing
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
1 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # Combine any combination of PublicInbox::Search,
5 # PublicInbox::ExtSearch, and PublicInbox::LeiSearch objects
6 # into one Xapian DB
7 package PublicInbox::LeiXSearch;
8 use strict;
9 use v5.10.1;
10 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
11 use PublicInbox::DS qw(dwaitpid);
12 use PublicInbox::OpPipe;
13 use PublicInbox::Import;
14 use File::Temp 0.19 (); # 0.19 for ->newdir
15 use File::Spec ();
16 use PublicInbox::Search qw(xap_terms);
17
18 sub new {
19         my ($class) = @_;
20         PublicInbox::Search::load_xapian();
21         bless {
22                 qp_flags => $PublicInbox::Search::QP_FLAGS |
23                                 PublicInbox::Search::FLAG_PURE_NOT(),
24         }, $class
25 }
26
27 sub attach_external {
28         my ($self, $ibxish) = @_; # ibxish = ExtSearch or Inbox
29
30         if (!$ibxish->can('over') || !$ibxish->over) {
31                 return push(@{$self->{remotes}}, $ibxish)
32         }
33         my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
34         my $srch = $ibxish->search or
35                 return warn("$desc not indexed for Xapian\n");
36         my @shards = $srch->xdb_shards_flat or
37                 return warn("$desc has no Xapian shardsXapian\n");
38
39         if (delete $self->{xdb}) { # XXX: do we need this?
40                 # clobber existing {xdb} if amending
41                 my $expect = delete $self->{nshard};
42                 my $shards = delete $self->{shards_flat};
43                 scalar(@$shards) == $expect or die
44                         "BUG: {nshard}$expect != shards=".scalar(@$shards);
45
46                 my $prev = {};
47                 for my $old_ibxish (@{$self->{shard2ibx}}) {
48                         next if $prev == $old_ibxish;
49                         $prev = $old_ibxish;
50                         my @shards = $old_ibxish->search->xdb_shards_flat;
51                         push @{$self->{shards_flat}}, @shards;
52                 }
53                 my $nr = scalar(@{$self->{shards_flat}});
54                 $nr == $expect or die
55                         "BUG: reloaded $nr shards, expected $expect"
56         }
57         push @{$self->{shards_flat}}, @shards;
58         push(@{$self->{shard2ibx}}, $ibxish) for (@shards);
59 }
60
61 # returns a list of local inboxes (or count in scalar context)
62 sub locals {
63         my %uniq = map {; "$_" => $_ } @{$_[0]->{shard2ibx} // []};
64         values %uniq;
65 }
66
67 # called by PublicInbox::Search::xdb
68 sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} }
69
70 # like over->get_art
71 sub smsg_for {
72         my ($self, $mitem) = @_;
73         # cf. https://trac.xapian.org/wiki/FAQ/MultiDatabaseDocumentID
74         my $nshard = $self->{nshard};
75         my $docid = $mitem->get_docid;
76         my $shard = ($docid - 1) % $nshard;
77         my $num = int(($docid - 1) / $nshard) + 1;
78         my $ibx = $self->{shard2ibx}->[$shard];
79         my $smsg = $ibx->over->get_art($num);
80         if (ref($ibx->can('msg_keywords'))) {
81                 my $kw = xap_terms('K', $mitem->get_document);
82                 $smsg->{kw} = [ sort keys %$kw ];
83         }
84         $smsg->{docid} = $docid;
85         $smsg;
86 }
87
88 sub recent {
89         my ($self, $qstr, $opt) = @_;
90         $opt //= {};
91         $opt->{relevance} //= -2;
92         $self->mset($qstr //= 'bytes:1..', $opt);
93 }
94
95 sub over {}
96
97 sub _mset_more ($$) {
98         my ($mset, $mo) = @_;
99         my $size = $mset->size;
100         $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
101 }
102
103 # $startq will EOF when query_prepare is done augmenting and allow
104 # query_mset and query_thread_mset to proceed.
105 sub wait_startq ($) {
106         my ($startq) = @_;
107         $_[0] = undef;
108         read($startq, my $query_prepare_done, 1);
109 }
110
111 sub query_thread_mset { # for --thread
112         my ($self, $lei, $ibxish) = @_;
113         local $0 = "$0 query_thread_mset";
114         my $startq = delete $self->{5};
115         my %sig = $lei->atfork_child_wq($self);
116         local @SIG{keys %sig} = values %sig;
117
118         my ($srch, $over) = ($ibxish->search, $ibxish->over);
119         unless ($srch && $over) {
120                 my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
121                 warn "$desc not indexed by Xapian\n";
122                 return;
123         }
124         my $mo = { %{$lei->{mset_opt}} };
125         my $mset;
126         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
127         my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
128         $dedupe->prepare_dedupe;
129         do {
130                 $mset = $srch->mset($mo->{qstr}, $mo);
131                 my $ids = $srch->mset_to_artnums($mset, $mo);
132                 my $ctx = { ids => $ids };
133                 my $i = 0;
134                 my %n2item = map { ($ids->[$i++], $_) } $mset->items;
135                 while ($over->expand_thread($ctx)) {
136                         for my $n (@{$ctx->{xids}}) {
137                                 my $smsg = $over->get_art($n) or next;
138                                 wait_startq($startq) if $startq;
139                                 next if $dedupe->is_smsg_dup($smsg);
140                                 my $mitem = delete $n2item{$smsg->{num}};
141                                 $each_smsg->($smsg, $mitem);
142                         }
143                         @{$ctx->{xids}} = ();
144                 }
145         } while (_mset_more($mset, $mo));
146         undef $each_smsg; # drops @io for l2m->{each_smsg_done}
147         $lei->{ovv}->ovv_atexit_child($lei);
148 }
149
150 sub query_mset { # non-parallel for non-"--thread" users
151         my ($self, $lei, $srcs) = @_;
152         local $0 = "$0 query_mset";
153         my $startq = delete $self->{5};
154         my %sig = $lei->atfork_child_wq($self);
155         local @SIG{keys %sig} = values %sig;
156         my $mo = { %{$lei->{mset_opt}} };
157         my $mset;
158         $self->attach_external($_) for @$srcs;
159         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
160         my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
161         $dedupe->prepare_dedupe;
162         do {
163                 $mset = $self->mset($mo->{qstr}, $mo);
164                 for my $mitem ($mset->items) {
165                         my $smsg = smsg_for($self, $mitem) or next;
166                         wait_startq($startq) if $startq;
167                         next if $dedupe->is_smsg_dup($smsg);
168                         $each_smsg->($smsg, $mitem);
169                 }
170         } while (_mset_more($mset, $mo));
171         undef $each_smsg; # drops @io for l2m->{each_smsg_done}
172         $lei->{ovv}->ovv_atexit_child($lei);
173 }
174
175 sub git {
176         my ($self) = @_;
177         my (%seen, @dirs);
178         my $tmp = File::Temp->newdir('lei_xsrch_git-XXXXXXXX', TMPDIR => 1);
179         for my $ibx (@{$self->{shard2ibx} // []}) {
180                 my $d = File::Spec->canonpath($ibx->git->{git_dir});
181                 $seen{$d} //= push @dirs, "$d/objects\n"
182         }
183         my $git_dir = $tmp->dirname;
184         PublicInbox::Import::init_bare($git_dir);
185         my $f = "$git_dir/objects/info/alternates";
186         open my $alt, '>', $f or die "open($f): $!";
187         print $alt @dirs or die "print $f: $!";
188         close $alt or die "close $f: $!";
189         my $git = PublicInbox::Git->new($git_dir);
190         $git->{-tmp} = $tmp;
191         $git;
192 }
193
194 sub query_done { # EOF callback
195         my ($self, $lei) = @_;
196         my $l2m = delete $lei->{l2m};
197         $l2m->wq_wait_old if $l2m;
198         $self->wq_wait_old;
199         $lei->{ovv}->ovv_end($lei);
200         if ($l2m) { # close() calls LeiToMail reap_compress
201                 close(delete($lei->{1})) if $lei->{1};
202                 $lei->start_mua;
203         }
204         $lei->dclose;
205 }
206
207 sub do_post_augment {
208         my ($lei, $zpipe, $au_done) = @_;
209         my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
210         eval { $l2m->post_augment($lei, $zpipe) };
211         if (my $err = $@) {
212                 if (my $lxs = delete $lei->{lxs}) {
213                         $lxs->wq_kill;
214                         $lxs->wq_close;
215                 }
216                 $lei->fail("$err");
217         }
218         close $au_done; # triggers wait_startq
219 }
220
221 sub start_query { # always runs in main (lei-daemon) process
222         my ($self, $io, $lei, $srcs) = @_;
223         my $remotes = $self->{remotes} // [];
224         if ($lei->{opt}->{thread}) {
225                 for my $ibxish (@$srcs) {
226                         $self->wq_do('query_thread_mset', $io, $lei, $ibxish);
227                 }
228         } else {
229                 $self->wq_do('query_mset', $io, $lei, $srcs);
230         }
231         # TODO
232         for my $rmt (@$remotes) {
233                 $self->wq_do('query_thread_mbox', $io, $lei, $rmt);
234         }
235         @$io = ();
236 }
237
238 sub query_prepare { # called by wq_do
239         my ($self, $lei) = @_;
240         local $0 = "$0 query_prepare";
241         my %sig = $lei->atfork_child_wq($self);
242         -p $lei->{0} or die "BUG: \$done pipe expected";
243         local @SIG{keys %sig} = values %sig;
244         eval { $lei->{l2m}->do_augment($lei) };
245         $lei->fail($@) if $@;
246         syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
247 }
248
249 sub sigpipe_handler { # handles SIGPIPE from wq workers
250         my ($self, $lei_orig) = @_;
251         if ($self->wq_kill_old) {
252                 kill 'PIPE', $$;
253                 $self->wq_wait_old;
254         } else {
255                 $self->wq_kill;
256                 $self->wq_close;
257         }
258         close(delete $lei_orig->{1}) if $lei_orig->{1};
259 }
260
261 sub do_query {
262         my ($self, $lei_orig, $srcs) = @_;
263         my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
264         $io[0] = undef;
265         pipe(my $done, $io[0]) or die "pipe $!";
266
267         $lei_orig->event_step_init; # wait for shutdowns
268         my $done_op = {
269                 '' => [ \&query_done, $self, $lei_orig ],
270                 '!' => [ \&sigpipe_handler, $self, $lei_orig ]
271         };
272         my $in_loop = exists $lei_orig->{sock};
273         $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
274         my $l2m = $lei->{l2m};
275         if ($l2m) {
276                 # may redirect $lei->{1} for mbox
277                 my $zpipe = $l2m->pre_augment($lei_orig);
278                 $io[1] = $lei_orig->{1};
279                 pipe(my ($startq, $au_done)) or die "pipe: $!";
280                 $done_op->{'.'} = [ \&do_post_augment, $lei_orig,
281                                         $zpipe, $au_done ];
282                 local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
283                 die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
284                 $self->wq_do('query_prepare', \@io, $lei);
285                 fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
286                 $io[5] = $startq;
287                 $io[1] = $zpipe->[1] if $zpipe;
288         }
289         start_query($self, \@io, $lei, $srcs);
290         $self->wq_close(1);
291         unless ($in_loop) {
292                 # for the $lei->atfork_child_wq PIPE handler:
293                 while ($done->{sock}) { $done->event_step }
294         }
295 }
296
297 sub ipc_atfork_prepare {
298         my ($self) = @_;
299         # (0: done_wr, 1: stdout|mbox, 2: stderr,
300         #  3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
301         $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]);
302         $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
303 }
304
305 1;