]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/ExtSearchIdx.pm
extsearchidx: symlink .rev and .bitmap files into ALL.git
[public-inbox.git] / lib / PublicInbox / ExtSearchIdx.pm
1 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # Detached/external index cross inbox search indexing support
5 # read-write counterpart to PublicInbox::ExtSearch
6 #
7 # It's based on the same ideas as public-inbox-v2-format(5) using
8 # over.sqlite3 for dedupe and sharded Xapian.  msgmap.sqlite3 is
9 # missing, so there is no Message-ID conflict resolution, meaning
10 # no NNTP support for now.
11 #
12 # v2 has a 1:1 mapping of index:inbox or msgmap for NNTP support.
13 # This is intended to be an M:N index:inbox mapping, but it'll likely
14 # be 1:N in common practice (M==1)
15
16 package PublicInbox::ExtSearchIdx;
17 use strict;
18 use v5.10.1;
19 use parent qw(PublicInbox::ExtSearch PublicInbox::Lock);
20 use Carp qw(croak carp);
21 use Sys::Hostname qw(hostname);
22 use POSIX qw(strftime);
23 use File::Glob qw(bsd_glob GLOB_NOSORT);
24 use PublicInbox::Search;
25 use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
26 use PublicInbox::OverIdx;
27 use PublicInbox::MiscIdx;
28 use PublicInbox::MID qw(mids);
29 use PublicInbox::V2Writable;
30 use PublicInbox::InboxWritable;
31 use PublicInbox::ContentHash qw(content_hash);
32 use PublicInbox::Eml;
33 use PublicInbox::DS qw(now add_timer);
34 use DBI qw(:sql_types); # SQL_BLOB
35
36 sub new {
37         my (undef, $dir, $opt) = @_;
38         my $l = $opt->{indexlevel} // 'full';
39         $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
40                 die "invalid indexlevel=$l\n";
41         $l eq 'basic' and die "E: indexlevel=basic not yet supported\n";
42         my $self = bless {
43                 xpfx => "$dir/ei".PublicInbox::Search::SCHEMA_VERSION,
44                 topdir => $dir,
45                 creat => $opt->{creat},
46                 ibx_map => {}, # (newsgroup//inboxdir) => $ibx
47                 ibx_list => [],
48                 indexlevel => $l,
49                 transact_bytes => 0,
50                 total_bytes => 0,
51                 current_info => '',
52                 parallel => 1,
53                 lock_path => "$dir/ei.lock",
54         }, __PACKAGE__;
55         $self->{shards} = $self->count_shards || nproc_shards($opt->{creat});
56         my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3");
57         $self->{-no_fsync} = $oidx->{-no_fsync} = 1 if !$opt->{fsync};
58         $self->{oidx} = $oidx;
59         $self
60 }
61
62 sub attach_inbox {
63         my ($self, $ibx) = @_;
64         $self->{ibx_map}->{$ibx->eidx_key} //= do {
65                 push @{$self->{ibx_list}}, $ibx;
66                 $ibx;
67         }
68 }
69
70 sub _ibx_attach { # each_inbox callback
71         my ($ibx, $self) = @_;
72         attach_inbox($self, $ibx);
73 }
74
75 sub attach_config {
76         my ($self, $cfg) = @_;
77         $self->{cfg} = $cfg;
78         $cfg->each_inbox(\&_ibx_attach, $self);
79 }
80
81 sub check_batch_limit ($) {
82         my ($req) = @_;
83         my $self = $req->{self};
84         my $new_smsg = $req->{new_smsg};
85         my $n = $self->{transact_bytes} += $new_smsg->{bytes};
86
87         # set flag for PublicInbox::V2Writable::index_todo:
88         ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes};
89 }
90
91 sub do_xpost ($$) {
92         my ($req, $smsg) = @_;
93         my $self = $req->{self};
94         my $docid = $smsg->{num};
95         my $idx = $self->idx_shard($docid);
96         my $oid = $req->{oid};
97         my $xibx = $req->{ibx};
98         my $eml = $req->{eml};
99         my $eidx_key = $xibx->eidx_key;
100         if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message
101                 my $xnum = $req->{xnum};
102                 $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key);
103                 $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
104                 check_batch_limit($req);
105         } else { # 'd'
106                 my $rm_eidx_info;
107                 my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key,
108                                                         \$rm_eidx_info);
109                 if ($nr == 0) {
110                         $self->{oidx}->eidxq_del($docid);
111                         $idx->ipc_do('xdb_remove', $docid);
112                 } elsif ($rm_eidx_info) {
113                         $idx->ipc_do('remove_eidx_info',
114                                         $docid, $eidx_key, $eml);
115                         $self->{oidx}->eidxq_add($docid); # yes, add
116                 }
117         }
118 }
119
120 # called by V2Writable::sync_prepare
121 sub artnum_max { $_[0]->{oidx}->eidx_max }
122
123 sub index_unseen ($) {
124         my ($req) = @_;
125         my $new_smsg = $req->{new_smsg} or die 'BUG: {new_smsg} unset';
126         my $eml = delete $req->{eml};
127         $new_smsg->populate($eml, $req);
128         my $self = $req->{self};
129         my $docid = $self->{oidx}->adj_counter('eidx_docid', '+');
130         $new_smsg->{num} = $docid;
131         my $idx = $self->idx_shard($docid);
132         $self->{oidx}->add_overview($eml, $new_smsg);
133         my $oid = $new_smsg->{blob};
134         my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
135         $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
136         $idx->index_eml($eml, $new_smsg, $ibx->eidx_key);
137         check_batch_limit($req);
138 }
139
140 sub do_finalize ($) {
141         my ($req) = @_;
142         if (my $indexed = $req->{indexed}) {
143                 do_xpost($req, $_) for @$indexed;
144         } elsif (exists $req->{new_smsg}) { # totally unseen messsage
145                 index_unseen($req);
146         } else {
147                 # `d' message was already unindexed in the v1/v2 inboxes,
148                 # so it's too noisy to warn, here.
149         }
150         # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
151         if (defined(my $cur_cmt = $req->{cur_cmt})) {
152                 ${$req->{latest_cmt}} = $cur_cmt;
153         }
154 }
155
156 sub do_step ($) { # main iterator for adding messages to the index
157         my ($req) = @_;
158         my $self = $req->{self} // die 'BUG: {self} missing';
159         while (1) {
160                 if (my $next_arg = $req->{next_arg}) {
161                         if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) {
162                                 $req->{cur_smsg} = $smsg;
163                                 $self->git->cat_async($smsg->{blob},
164                                                         \&ck_existing, $req);
165                                 return; # ck_existing calls do_step
166                         }
167                         delete $req->{cur_smsg};
168                         delete $req->{next_arg};
169                 }
170                 my $mid = shift(@{$req->{mids}});
171                 last unless defined $mid;
172                 my ($id, $prev);
173                 $req->{next_arg} = [ $mid, \$id, \$prev ];
174                 # loop again
175         }
176         do_finalize($req);
177 }
178
179 sub _blob_missing ($) { # called when req->{cur_smsg}->{blob} is bad
180         my ($req) = @_;
181         my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
182         my $self = $req->{self};
183         my $xref3 = $self->{oidx}->get_xref3($smsg->{num});
184         my @keep = grep(!/:$smsg->{blob}\z/, @$xref3);
185         if (@keep) {
186                 $keep[0] =~ /:([a-f0-9]{40,}+)\z/ or
187                         die "BUG: xref $keep[0] has no OID";
188                 my $oidhex = $1;
189                 $self->{oidx}->remove_xref3($smsg->{num}, $smsg->{blob});
190                 my $upd = $self->{oidx}->update_blob($smsg, $oidhex);
191                 my $saved = $self->{oidx}->get_art($smsg->{num});
192         } else {
193                 $self->{oidx}->delete_by_num($smsg->{num});
194         }
195 }
196
197 sub ck_existing { # git->cat_async callback
198         my ($bref, $oid, $type, $size, $req) = @_;
199         my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
200         if ($type eq 'missing') {
201                 _blob_missing($req);
202         } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
203                 my $self = $req->{self} // die 'BUG: {self} missing';
204                 local $self->{current_info} = "$self->{current_info} $oid";
205                 my $cur = PublicInbox::Eml->new($bref);
206                 if (content_hash($cur) eq $req->{chash}) {
207                         push @{$req->{indexed}}, $smsg; # for do_xpost
208                 } # else { index_unseen later }
209         }
210         do_step($req);
211 }
212
213 # is the messages visible in the inbox currently being indexed?
214 # return the number if so
215 sub cur_ibx_xnum ($$) {
216         my ($req, $bref) = @_;
217         my $ibx = $req->{ibx} or die 'BUG: current {ibx} missing';
218
219         $req->{eml} = PublicInbox::Eml->new($bref);
220         $req->{chash} = content_hash($req->{eml});
221         $req->{mids} = mids($req->{eml});
222         my @q = @{$req->{mids}}; # copy
223         while (defined(my $mid = shift @q)) {
224                 my ($id, $prev);
225                 while (my $x = $ibx->over->next_by_mid($mid, \$id, \$prev)) {
226                         return $x->{num} if $x->{blob} eq $req->{oid};
227                 }
228         }
229         undef;
230 }
231
232 sub index_oid { # git->cat_async callback for 'm'
233         my ($bref, $oid, $type, $size, $req) = @_;
234         my $self = $req->{self};
235         local $self->{current_info} = "$self->{current_info} $oid";
236         return if is_bad_blob($oid, $type, $size, $req->{oid});
237         my $new_smsg = $req->{new_smsg} = bless {
238                 blob => $oid,
239         }, 'PublicInbox::Smsg';
240         $new_smsg->set_bytes($$bref, $size);
241         defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return;
242         ++${$req->{nr}};
243         do_step($req);
244 }
245
246 sub unindex_oid { # git->cat_async callback for 'd'
247         my ($bref, $oid, $type, $size, $req) = @_;
248         my $self = $req->{self};
249         local $self->{current_info} = "$self->{current_info} $oid";
250         return if is_bad_blob($oid, $type, $size, $req->{oid});
251         return if defined(cur_ibx_xnum($req, $bref)); # was re-added
252         do_step($req);
253 }
254
255 # overrides V2Writable::last_commits, called by sync_ranges via sync_prepare
256 sub last_commits {
257         my ($self, $sync) = @_;
258         my $heads = [];
259         my $ekey = $sync->{ibx}->eidx_key;
260         my $uv = $sync->{ibx}->uidvalidity;
261         for my $i (0..$sync->{epoch_max}) {
262                 $heads->[$i] = $self->{oidx}->eidx_meta("lc-v2:$ekey//$uv;$i");
263         }
264         $heads;
265 }
266
267 sub _ibx_index_reject ($) {
268         my ($ibx) = @_;
269         $ibx->mm // return 'unindexed, no msgmap.sqlite3';
270         $ibx->uidvalidity // return 'no UIDVALIDITY';
271         $ibx->over // return 'unindexed, no over.sqlite3';
272         undef;
273 }
274
275 sub _sync_inbox ($$$) {
276         my ($self, $sync, $ibx) = @_;
277         my $ekey = $ibx->eidx_key;
278         if (defined(my $err = _ibx_index_reject($ibx))) {
279                 return "W: skipping $ekey ($err)";
280         }
281         $sync->{ibx} = $ibx;
282         $sync->{nr} = \(my $nr = 0);
283         my $v = $ibx->version;
284         if ($v == 2) {
285                 $sync->{epoch_max} = $ibx->max_git_epoch // return;
286                 sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
287         } elsif ($v == 1) {
288                 my $uv = $ibx->uidvalidity;
289                 my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv");
290                 my $head = $ibx->mm->last_commit //
291                         return "E: $ibx->{inboxdir} is not indexed";
292                 my $stk = prepare_stack($sync, $lc ? "$lc..$head" : $head);
293                 my $unit = { stack => $stk, git => $ibx->git };
294                 push @{$sync->{todo}}, $unit;
295         } else {
296                 return "E: $ekey unsupported inbox version (v$v)";
297         }
298         for my $unit (@{delete($sync->{todo}) // []}) {
299                 last if $sync->{quit};
300                 index_todo($self, $sync, $unit);
301         }
302         $self->{midx}->index_ibx($ibx) unless $sync->{quit};
303         $ibx->git->cleanup; # done with this inbox, now
304         undef;
305 }
306
307 sub gc_unref_doc ($$$$) {
308         my ($self, $ibx_id, $eidx_key, $docid) = @_;
309         my $dbh = $self->{oidx}->dbh;
310
311         # for debug/info purposes, oids may no longer be accessible
312         my $sth = $dbh->prepare_cached(<<'', undef, 1);
313 SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ?
314
315         $sth->execute($docid, $ibx_id);
316         my @oid = map { unpack('H*', $_->[0]) } @{$sth->fetchall_arrayref};
317
318         $dbh->prepare_cached(<<'')->execute($docid, $ibx_id);
319 DELETE FROM xref3 WHERE docid = ? AND ibx_id = ?
320
321         my $remain = $self->{oidx}->get_xref3($docid);
322         if (scalar(@$remain)) {
323                 $self->{oidx}->eidxq_add($docid); # enqueue for reindex
324                 for my $oid (@oid) {
325                         warn "I: unref #$docid $eidx_key $oid\n";
326                 }
327         } else {
328                 warn "I: remove #$docid $eidx_key @oid\n";
329                 $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
330         }
331 }
332
333 sub eidx_gc {
334         my ($self, $opt) = @_;
335         $self->{cfg} or die "E: GC requires ->attach_config\n";
336         $opt->{-idx_gc} = 1;
337         $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
338
339         my $dbh = $self->{oidx}->dbh;
340         $dbh->do('PRAGMA case_sensitive_like = ON'); # only place we use LIKE
341         my $x3_doc = $dbh->prepare('SELECT docid FROM xref3 WHERE ibx_id = ?');
342         my $ibx_ck = $dbh->prepare('SELECT ibx_id,eidx_key FROM inboxes');
343         my $lc_i = $dbh->prepare(<<'');
344 SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ?
345
346         $ibx_ck->execute;
347         while (my ($ibx_id, $eidx_key) = $ibx_ck->fetchrow_array) {
348                 next if $self->{ibx_map}->{$eidx_key};
349                 $self->{midx}->remove_eidx_key($eidx_key);
350                 warn "I: deleting messages for $eidx_key...\n";
351                 $x3_doc->execute($ibx_id);
352                 while (defined(my $docid = $x3_doc->fetchrow_array)) {
353                         gc_unref_doc($self, $ibx_id, $eidx_key, $docid);
354                 }
355                 $dbh->prepare_cached(<<'')->execute($ibx_id);
356 DELETE FROM inboxes WHERE ibx_id = ?
357
358                 # drop last_commit info
359                 my $pat = $eidx_key;
360                 $pat =~ s/([_%\\])/\\$1/g;
361                 $lc_i->execute("lc-%:$pat//%", '\\');
362                 while (my ($key) = $lc_i->fetchrow_array) {
363                         next if $key !~ m!\Alc-v[1-9]+:\Q$eidx_key\E//!;
364                         warn "I: removing $key\n";
365                         $dbh->prepare_cached(<<'')->execute($key);
366 DELETE FROM eidx_meta WHERE key = ?
367
368                 }
369
370                 warn "I: $eidx_key removed\n";
371         }
372
373         # it's not real unless it's in `over', we use parallelism here,
374         # shards will be reading directly from over, so commit
375         $self->{oidx}->commit_lazy;
376         $self->{oidx}->begin_lazy;
377
378         for my $idx (@{$self->{idx_shards}}) {
379                 warn "I: cleaning up shard #$idx->{shard}\n";
380                 $idx->shard_over_check($self->{oidx});
381         }
382         my $nr = $dbh->do(<<'');
383 DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
384
385         warn "I: eliminated $nr stale xref3 entries\n" if $nr != 0;
386
387         done($self);
388 }
389
390 sub _ibx_for ($$$) {
391         my ($self, $sync, $smsg) = @_;
392         my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
393         my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
394         $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
395 }
396
397 sub _fd_constrained ($) {
398         my ($self) = @_;
399         $self->{-fd_constrained} //= do {
400                 my $soft;
401                 if (eval { require BSD::Resource; 1 }) {
402                         my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
403                         ($soft, undef) = BSD::Resource::getrlimit($NOFILE);
404                 } else {
405                         chomp($soft = `sh -c 'ulimit -n'`);
406                 }
407                 if (defined($soft)) {
408                         my $want = scalar(@{$self->{ibx_list}}) + 64; # estimate
409                         my $ret = $want > $soft;
410                         if ($ret) {
411                                 warn <<EOF;
412 RLIMIT_NOFILE=$soft insufficient (want: $want), will close DB handles early
413 EOF
414                         }
415                         $ret;
416                 } else {
417                         warn "Unable to determine RLIMIT_NOFILE: $@\n";
418                         1;
419                 }
420         };
421 }
422
423 sub _reindex_finalize ($$$) {
424         my ($req, $smsg, $eml) = @_;
425         my $sync = $req->{sync};
426         my $self = $sync->{self};
427         my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}';
428         my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
429         my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
430         my $docid = $smsg->{num} = $orig_smsg->{num};
431         $self->{oidx}->add_overview($eml, $smsg); # may rethread
432         check_batch_limit({ %$sync, new_smsg => $smsg });
433         my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
434         my $stable = delete($by_chash->{$chash0}) //
435                                 die "BUG: $smsg->{blob} chash missing";
436         my $idx = $self->idx_shard($docid);
437         my $top_smsg = pop @$stable;
438         $top_smsg == $smsg or die 'BUG: top_smsg != smsg';
439         my $ibx = _ibx_for($self, $sync, $smsg);
440         $idx->index_eml($eml, $smsg, $ibx->eidx_key);
441         for my $x (reverse @$stable) {
442                 $ibx = _ibx_for($self, $sync, $x);
443                 my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
444                 $idx->ipc_do('add_eidx_info', $docid, $ibx->eidx_key, $hdr);
445         }
446         return if $nr == 1; # likely, all good
447
448         warn "W: #$docid split into $nr due to deduplication change\n";
449         my @todo;
450         for my $ary (values %$by_chash) {
451                 for my $x (reverse @$ary) {
452                         warn "removing #$docid xref3 $x->{blob}\n";
453                         my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
454                         die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
455                 }
456                 my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
457                 $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
458                 $ibx = _ibx_for($self, $sync, $x);
459                 if (my $over = $ibx->over) {
460                         my $e = $over->get_art($x->{num});
461                         $e->{blob} eq $x->{blob} or die <<EOF;
462 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
463 EOF
464                         push @todo, $ibx, $e;
465                         $over->dbh_close if _fd_constrained($self);
466                 } else {
467                         die "$ibx->{inboxdir}: over.sqlite3 unusable: $!\n";
468                 }
469         }
470         undef $by_chash;
471         while (my ($ibx, $e) = splice(@todo, 0, 2)) {
472                 reindex_unseen($self, $sync, $ibx, $e);
473         }
474 }
475
476 sub _reindex_oid { # git->cat_async callback
477         my ($bref, $oid, $type, $size, $req) = @_;
478         my $sync = $req->{sync};
479         my $self = $sync->{self};
480         my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
481         my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2];
482         my $docid = $orig_smsg->{num};
483         if (is_bad_blob($oid, $type, $size, $expect_oid)) {
484                 my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
485                 if ($remain == 0) {
486                         warn "W: #$docid gone or corrupted\n";
487                         $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
488                 } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
489                         $self->git->cat_async($next_oid, \&_reindex_oid, $req);
490                 } else {
491                         warn "BUG: #$docid gone (UNEXPECTED)\n";
492                         $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
493                 }
494                 return;
495         }
496         my $ci = $self->{current_info};
497         local $self->{current_info} = "$ci #$docid $oid";
498         my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
499         $re_smsg->set_bytes($$bref, $size);
500         my $eml = PublicInbox::Eml->new($bref);
501         $re_smsg->populate($eml, { autime => $orig_smsg->{ds},
502                                 cotime => $orig_smsg->{ts} });
503         my $chash = content_hash($eml);
504         $re_smsg->{chash} = $chash;
505         $re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1];
506         $re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0];
507         $re_smsg->{hdr} = $eml->header_obj;
508         push @{$req->{by_chash}->{$chash}}, $re_smsg;
509         if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
510                 $self->git->cat_async($next_oid, \&_reindex_oid, $req);
511         } else { # last $re_smsg is the highest priority xref3
512                 local $self->{current_info} = "$ci #$docid";
513                 _reindex_finalize($req, $re_smsg, $eml);
514         }
515 }
516
517 sub _reindex_smsg ($$$) {
518         my ($self, $sync, $smsg) = @_;
519         my $docid = $smsg->{num};
520         my $xr3 = $self->{oidx}->get_xref3($docid, 1);
521         if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this
522                 warn <<"";
523 BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
524
525                 $self->{oidx}->delete_by_num($docid);
526                 $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
527                 return;
528         }
529
530         # we sort {xr3r} in the reverse order of {ibx_list} so we can
531         # hit the common case in _reindex_finalize without rereading
532         # from git (or holding multiple messages in memory).
533         my $id2pos = $sync->{id2pos}; # index in {ibx_list}
534         @$xr3 = sort {
535                 $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
536                                 ||
537                 $b->[1] <=> $a->[1] # break ties with {xnum}
538         } @$xr3;
539         @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
540         my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 };
541         $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
542 }
543
544 sub checkpoint_due ($) {
545         my ($sync) = @_;
546         ${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
547 }
548
549 sub host_ident () {
550         # I've copied FS images and only changed the hostname before,
551         # so prepend hostname.  Use `state' since these a BOFH can change
552         # these while this process is running and we always want to be
553         # able to release locks taken by this process.
554         state $retval = hostname . '-' . do {
555                 my $m; # machine-id(5) is systemd
556                 if (open(my $fh, '<', '/etc/machine-id')) { $m = <$fh> }
557                 # (g)hostid(1) is in GNU coreutils, kern.hostid is most BSDs
558                 chomp($m ||= `{ sysctl -n kern.hostid ||
559                                 hostid || ghostid; } 2>/dev/null`
560                         || "no-machine-id-or-hostid-on-$^O");
561                 $m;
562         };
563 }
564
565 sub eidxq_release {
566         my ($self) = @_;
567         my $expect = delete($self->{-eidxq_locked}) or return;
568         my ($owner_pid, undef) = split(/-/, $expect);
569         return if $owner_pid != $$; # shards may fork
570         my $oidx = $self->{oidx};
571         $oidx->begin_lazy;
572         my $cur = $oidx->eidx_meta('eidxq_lock') // '';
573         if ($cur eq $expect) {
574                 $oidx->eidx_meta('eidxq_lock', '');
575                 return 1;
576         } elsif ($cur ne '') {
577                 warn "E: eidxq_lock($expect) stolen by $cur\n";
578         } else {
579                 warn "E: eidxq_lock($expect) released by another process\n";
580         }
581         undef;
582 }
583
584 sub DESTROY {
585         my ($self) = @_;
586         eidxq_release($self) and $self->{oidx}->commit_lazy;
587 }
588
589 sub _eidxq_take ($) {
590         my ($self) = @_;
591         my $val = "$$-${\time}-$>-".host_ident;
592         $self->{oidx}->eidx_meta('eidxq_lock', $val);
593         $self->{-eidxq_locked} = $val;
594 }
595
596 sub eidxq_lock_acquire ($) {
597         my ($self) = @_;
598         my $oidx = $self->{oidx};
599         $oidx->begin_lazy;
600         my $cur = $oidx->eidx_meta('eidxq_lock') || return _eidxq_take($self);
601         if (my $locked = $self->{-eidxq_locked}) { # be lazy
602                 return $locked if $locked eq $cur;
603         }
604         my ($pid, $time, $euid, $ident) = split(/-/, $cur, 4);
605         my $t = strftime('%Y-%m-%d %k:%M:%S', gmtime($time));
606         if ($euid == $> && $ident eq host_ident) {
607                 if (kill(0, $pid)) {
608                         warn <<EOM; return;
609 I: PID:$pid (re)indexing Xapian since $t, it will continue our work
610 EOM
611                 }
612                 if ($!{ESRCH}) {
613                         warn "I: eidxq_lock is stale ($cur), clobbering\n";
614                         return _eidxq_take($self);
615                 }
616                 warn "E: kill(0, $pid) failed: $!\n"; # fall-through:
617         }
618         my $fn = $oidx->dbh->sqlite_db_filename;
619         warn <<EOF;
620 W: PID:$pid, UID:$euid on $ident is indexing Xapian since $t
621 W: If this is unexpected, delete `eidxq_lock' from the `eidx_meta' table:
622 W:      sqlite3 $fn 'DELETE FROM eidx_meta WHERE key = "eidxq_lock"'
623 EOF
624         undef;
625 }
626
627 sub eidxq_process ($$) { # for reindexing
628         my ($self, $sync) = @_;
629
630         return unless eidxq_lock_acquire($self);
631         my $dbh = $self->{oidx}->dbh;
632         my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
633         ${$sync->{nr}} = 0;
634         local $sync->{-regen_fmt} = "%u/$tot\n";
635         my $pr = $sync->{-opt}->{-progress};
636         if ($pr) {
637                 my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
638                 my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
639                 $pr->("Xapian indexing $min..$max (total=$tot)\n");
640         }
641         $sync->{id2pos} //= do {
642                 my %id2pos;
643                 my $pos = 0;
644                 $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
645                 \%id2pos;
646         };
647         my ($del, $iter);
648 restart:
649         $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
650         $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
651         $iter->execute;
652         while (defined(my $docid = $iter->fetchrow_array)) {
653                 last if $sync->{quit};
654                 if (my $smsg = $self->{oidx}->get_art($docid)) {
655                         _reindex_smsg($self, $sync, $smsg);
656                 } else {
657                         warn "E: #$docid does not exist in over\n";
658                 }
659                 $del->execute($docid);
660                 ++${$sync->{nr}};
661
662                 if (checkpoint_due($sync)) {
663                         $dbh = $del = $iter = undef;
664                         reindex_checkpoint($self, $sync); # release lock
665                         $dbh = $self->{oidx}->dbh;
666                         goto restart;
667                 }
668         }
669         $self->git->async_wait_all;
670         $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
671 }
672
673 sub _reindex_unseen { # git->cat_async callback
674         my ($bref, $oid, $type, $size, $req) = @_;
675         return if is_bad_blob($oid, $type, $size, $req->{oid});
676         my $self = $req->{self} // die 'BUG: {self} unset';
677         local $self->{current_info} = "$self->{current_info} $oid";
678         my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
679         $new_smsg->set_bytes($$bref, $size);
680         my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
681         $req->{new_smsg} = $new_smsg;
682         $req->{chash} = content_hash($eml);
683         $req->{mids} = mids($eml); # do_step iterates through this
684         do_step($req); # enter the normal indexing flow
685 }
686
687 # --reindex may catch totally unseen messages, this handles them
688 sub reindex_unseen ($$$$) {
689         my ($self, $sync, $ibx, $xsmsg) = @_;
690         my $req = {
691                 %$sync, # has {self}
692                 autime => $xsmsg->{ds},
693                 cotime => $xsmsg->{ts},
694                 oid => $xsmsg->{blob},
695                 ibx => $ibx,
696                 xnum => $xsmsg->{num},
697                 # {mids} and {chash} will be filled in at _reindex_unseen
698         };
699         warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n";
700         $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req);
701 }
702
703 sub _reindex_check_unseen ($$$) {
704         my ($self, $sync, $ibx) = @_;
705         my $ibx_id = $ibx->{-ibx_id};
706         my $slice = 1000;
707         my ($beg, $end) = (1, $slice);
708
709         # first, check if we missed any messages in target $ibx
710         my $msgs;
711         my $pr = $sync->{-opt}->{-progress};
712         my $ekey = $ibx->eidx_key;
713         local $sync->{-regen_fmt} =
714                         "$ekey checking unseen %u/".$ibx->over->max."\n";
715         ${$sync->{nr}} = 0;
716
717         while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
718                 ${$sync->{nr}} = $beg;
719                 $beg = $msgs->[-1]->{num} + 1;
720                 $end = $beg + $slice;
721                 if (checkpoint_due($sync)) {
722                         reindex_checkpoint($self, $sync); # release lock
723                 }
724
725                 my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
726 SELECT DISTINCT(docid) FROM xref3 WHERE
727 ibx_id = ? AND xnum = ? AND oidbin = ?
728
729                 for my $xsmsg (@$msgs) {
730                         my $oidbin = pack('H*', $xsmsg->{blob});
731                         $inx3->bind_param(1, $ibx_id);
732                         $inx3->bind_param(2, $xsmsg->{num});
733                         $inx3->bind_param(3, $oidbin, SQL_BLOB);
734                         $inx3->execute;
735                         my $docids = $inx3->fetchall_arrayref;
736                         # index messages which were totally missed
737                         # the first time around ASAP:
738                         if (scalar(@$docids) == 0) {
739                                 reindex_unseen($self, $sync, $ibx, $xsmsg);
740                         } else { # already seen, reindex later
741                                 for my $r (@$docids) {
742                                         $self->{oidx}->eidxq_add($r->[0]);
743                                 }
744                         }
745                         last if $sync->{quit};
746                 }
747                 last if $sync->{quit};
748         }
749 }
750
751 sub _reindex_check_stale ($$$) {
752         my ($self, $sync, $ibx) = @_;
753         my $min = 0;
754         my $pr = $sync->{-opt}->{-progress};
755         my $fetching;
756         my $ekey = $ibx->eidx_key;
757         local $sync->{-regen_fmt} =
758                         "$ekey check stale/missing %u/".$ibx->over->max."\n";
759         ${$sync->{nr}} = 0;
760         do {
761                 if (checkpoint_due($sync)) {
762                         reindex_checkpoint($self, $sync); # release lock
763                 }
764                 # now, check if there's stale xrefs
765                 my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
766 SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ?
767 ORDER BY docid,xnum ASC LIMIT 10000
768
769                 $iter->execute($ibx->{-ibx_id}, $min);
770                 $fetching = undef;
771
772                 while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) {
773                         return if $sync->{quit};
774                         ${$sync->{nr}} = $xnum;
775
776                         $fetching = $min = $docid;
777                         my $smsg = $ibx->over->get_art($xnum);
778                         my $oidhex = unpack('H*', $oidbin);
779                         my $err;
780                         if (!$smsg) {
781                                 $err = 'stale';
782                         } elsif ($smsg->{blob} ne $oidhex) {
783                                 $err = "mismatch (!= $smsg->{blob})";
784                         } else {
785                                 next; # likely, all good
786                         }
787                         # current_info already has eidx_key
788                         warn "$xnum:$oidhex (#$docid): $err\n";
789                         my $del = $self->{oidx}->dbh->prepare_cached(<<'');
790 DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
791
792                         $del->bind_param(1, $ibx->{-ibx_id});
793                         $del->bind_param(2, $xnum);
794                         $del->bind_param(3, $oidbin, SQL_BLOB);
795                         $del->execute;
796
797                         # get_xref3 over-fetches, but this is a rare path:
798                         my $xr3 = $self->{oidx}->get_xref3($docid);
799                         my $idx = $self->idx_shard($docid);
800                         if (scalar(@$xr3) == 0) { # all gone
801                                 $self->{oidx}->delete_by_num($docid);
802                                 $self->{oidx}->eidxq_del($docid);
803                                 $idx->ipc_do('xdb_remove', $docid);
804                         } else { # enqueue for reindex of remaining messages
805                                 $idx->ipc_do('remove_eidx_info',
806                                                 $docid, $ibx->eidx_key);
807                                 $self->{oidx}->eidxq_add($docid); # yes, add
808                         }
809                 }
810         } while (defined $fetching);
811 }
812
813 sub _reindex_inbox ($$$) {
814         my ($self, $sync, $ibx) = @_;
815         my $ekey = $ibx->eidx_key;
816         local $self->{current_info} = $ekey;
817         if (defined(my $err = _ibx_index_reject($ibx))) {
818                 warn "W: cannot reindex $ekey ($err)\n";
819         } else {
820                 _reindex_check_unseen($self, $sync, $ibx);
821                 _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
822         }
823         delete @$ibx{qw(over mm search git)}; # won't need these for a bit
824 }
825
826 sub eidx_reindex {
827         my ($self, $sync) = @_;
828
829         # acquire eidxq_lock early because full reindex takes forever
830         # and incremental -extindex processes can run during our checkpoints
831         if (!eidxq_lock_acquire($self)) {
832                 warn "E: aborting --reindex\n";
833                 return;
834         }
835         for my $ibx (@{$self->{ibx_list}}) {
836                 _reindex_inbox($self, $sync, $ibx);
837                 last if $sync->{quit};
838         }
839         $self->git->async_wait_all; # ensure eidxq gets filled completely
840         eidxq_process($self, $sync) unless $sync->{quit};
841 }
842
843 sub sync_inbox {
844         my ($self, $sync, $ibx) = @_;
845         my $err = _sync_inbox($self, $sync, $ibx);
846         delete @$ibx{qw(mm over)};
847         warn $err, "\n" if defined($err);
848 }
849
850 sub eidx_sync { # main entry point
851         my ($self, $opt) = @_;
852
853         my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
854         local $self->{current_info} = '';
855         local $SIG{__WARN__} = sub {
856                 $warn_cb->($self->{current_info}, ': ', @_);
857         };
858         $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
859         $self->{oidx}->rethread_prepare($opt);
860         my $sync = {
861                 need_checkpoint => \(my $need_checkpoint = 0),
862                 check_intvl => 10,
863                 next_check => now() + 10,
864                 -opt => $opt,
865                 # DO NOT SET {reindex} here, it's incompatible with reused
866                 # V2Writable code, reindex is totally different here
867                 # compared to v1/v2 inboxes because we have multiple histories
868                 self => $self,
869                 -regen_fmt => "%u/?\n",
870         };
871         local $SIG{USR1} = sub { $need_checkpoint = 1 };
872         my $quit = PublicInbox::SearchIdx::quit_cb($sync);
873         local $SIG{QUIT} = $quit;
874         local $SIG{INT} = $quit;
875         local $SIG{TERM} = $quit;
876         for my $ibx (@{$self->{ibx_list}}) {
877                 $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
878         }
879         if (delete($opt->{reindex})) {
880                 local $sync->{checkpoint_unlocks} = 1;
881                 eidx_reindex($self, $sync);
882         }
883
884         # don't use $_ here, it'll get clobbered by reindex_checkpoint
885         if ($opt->{scan} // 1) {
886                 for my $ibx (@{$self->{ibx_list}}) {
887                         last if $sync->{quit};
888                         sync_inbox($self, $sync, $ibx);
889                 }
890         }
891         $self->{oidx}->rethread_done($opt) unless $sync->{quit};
892         eidxq_process($self, $sync) unless $sync->{quit};
893
894         eidxq_release($self);
895         done($self);
896         $sync; # for eidx_watch
897 }
898
899 sub update_last_commit { # overrides V2Writable
900         my ($self, $sync, $stk) = @_;
901         my $unit = $sync->{unit} // return;
902         my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
903         defined($latest_cmt) or return;
904         my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
905         my $ekey = $ibx->eidx_key;
906         my $uv = $ibx->uidvalidity;
907         my $epoch = $unit->{epoch};
908         my $meta_key;
909         my $v = $ibx->version;
910         if ($v == 2) {
911                 die 'No {epoch} for v2 unit' unless defined $epoch;
912                 $meta_key = "lc-v2:$ekey//$uv;$epoch";
913         } elsif ($v == 1) {
914                 die 'Unexpected {epoch} for v1 unit' if defined $epoch;
915                 $meta_key = "lc-v1:$ekey//$uv";
916         } else {
917                 die "Unsupported inbox version: $v";
918         }
919         my $last = $self->{oidx}->eidx_meta($meta_key);
920         if (defined $last && is_ancestor($self->git, $last, $latest_cmt)) {
921                 my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
922                 chomp(my $n = $unit->{git}->qx(@cmd));
923                 return if $n ne '' && $n == 0;
924         }
925         $self->{oidx}->eidx_meta($meta_key, $latest_cmt);
926 }
927
928 sub _idx_init { # with_umask callback
929         my ($self, $opt) = @_;
930         PublicInbox::V2Writable::_idx_init($self, $opt); # acquires ei.lock
931         $self->{midx} = PublicInbox::MiscIdx->new($self);
932 }
933
934 sub symlink_packs ($$) {
935         my ($ibx, $pd) = @_;
936         my $ret = 0;
937         my $glob = "$ibx->{inboxdir}/git/*.git/objects/pack/*.idx";
938         for my $idx (bsd_glob($glob, GLOB_NOSORT)) {
939                 my $src = substr($idx, 0, -length('.idx'));
940                 my $dst = $pd . substr($src, rindex($src, '/'));
941                 if (-f "$src.pack" and
942                                 symlink("$src.pack", "$dst.pack") and
943                                 symlink($idx, "$dst.idx") and
944                                 -f $idx) {
945                         ++$ret;
946                         # .promisor, .bitmap, .rev and .keep are optional
947                         # XXX should we symlink .keep here?
948                         for my $s (qw(promisor bitmap rev)) {
949                                 symlink("$src.$s", "$dst.$s") if -f "$src.$s";
950                         }
951                 } elsif (!$!{EEXIST}) {
952                         warn "W: ln -s $src.{pack,idx} => $dst.*: $!\n";
953                         unlink "$dst.pack", "$dst.idx";
954                 }
955         }
956         $ret;
957 }
958
959 sub idx_init { # similar to V2Writable
960         my ($self, $opt) = @_;
961         return if $self->{idx_shards};
962
963         $self->git->cleanup;
964         my $mode = 0644;
965         my $ALL = $self->git->{git_dir}; # ALL.git
966         my $old = -d $ALL;
967         if ($opt->{-private}) { # LeiStore
968                 $mode = 0600;
969                 if (!$old) {
970                         umask 077; # don't bother restoring
971                         PublicInbox::Import::init_bare($ALL);
972                         $self->git->qx(qw(config core.sharedRepository 0600));
973                 }
974         } else {
975                 PublicInbox::Import::init_bare($ALL) unless $old;
976         }
977         my $info_dir = "$ALL/objects/info";
978         my $alt = "$info_dir/alternates";
979         my (@old, @new, %seen); # seen: st_dev + st_ino
980         if (-e $alt) {
981                 open(my $fh, '<', $alt) or die "open $alt: $!";
982                 $mode = (stat($fh))[2] & 07777;
983                 while (my $line = <$fh>) {
984                         chomp(my $d = $line);
985
986                         # expand relative path (/local/ stuff)
987                         substr($d, 0, 3) eq '../' and
988                                 $d = "$ALL/objects/$d";
989                         if (my @st = stat($d)) {
990                                 next if $seen{"$st[0]\0$st[1]"}++;
991                         } else {
992                                 warn "W: stat($d) failed (from $alt): $!\n";
993                                 next if $opt->{-idx_gc};
994                         }
995                         push @old, $line;
996                 }
997         }
998
999         # for LeiStore, and possibly some mirror-only state
1000         if (opendir(my $dh, my $local = "$self->{topdir}/local")) {
1001                 # highest numbered epoch first
1002                 for my $n (sort { $b <=> $a } map { substr($_, 0, -4) + 0 }
1003                                 grep(/\A[0-9]+\.git\z/, readdir($dh))) {
1004                         my $d = "$local/$n.git/objects"; # absolute path
1005                         if (my @st = stat($d)) {
1006                                 next if $seen{"$st[0]\0$st[1]"}++;
1007                                 # favor relative paths for rename-friendliness
1008                                 push @new, "../../local/$n.git/objects\n";
1009                         } else {
1010                                 warn "W: stat($d) failed: $!\n";
1011                         }
1012                 }
1013         }
1014         # git-multi-pack-index(1) can speed up "git cat-file" startup slightly
1015         my $dh;
1016         my $git_midx = 0;
1017         my $pd = "$ALL/objects/pack";
1018         if (!mkdir($pd) && $!{EEXIST} && opendir($dh, $pd)) {
1019                 # drop stale symlinks
1020                 while (defined(my $dn = readdir($dh))) {
1021                         if ($dn =~ /\.(?:idx|pack|promisor|bitmap|rev)\z/) {
1022                                 my $f = "$pd/$dn";
1023                                 unlink($f) if -l $f && !-e $f;
1024                         }
1025                 }
1026                 undef $dh;
1027         }
1028         for my $ibx (@{$self->{ibx_list}}) {
1029                 # create symlinks for multi-pack-index
1030                 $git_midx += symlink_packs($ibx, $pd);
1031                 # add new lines to our alternates file
1032                 my $line = $ibx->git->{git_dir} . "/objects\n";
1033                 chomp(my $d = $line);
1034                 if (my @st = stat($d)) {
1035                         next if $seen{"$st[0]\0$st[1]"}++;
1036                 } else {
1037                         warn "W: stat($d) failed (from $ibx->{inboxdir}): $!\n";
1038                         next if $opt->{-idx_gc};
1039                 }
1040                 push @new, $line;
1041         }
1042         if (scalar @new) {
1043                 push @old, @new;
1044                 my $o = \@old;
1045                 PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o);
1046         }
1047         $git_midx and $self->with_umask(sub {
1048                 my @cmd = ('multi-pack-index');
1049                 push @cmd, '--no-progress' if ($opt->{quiet}//0) > 1;
1050                 system('git', "--git-dir=$ALL", @cmd, 'write');
1051                 # ignore errors, fairly new command, may not exist
1052         });
1053         $self->parallel_init($self->{indexlevel});
1054         $self->with_umask(\&_idx_init, $self, $opt);
1055         $self->{oidx}->begin_lazy;
1056         $self->{oidx}->eidx_prep;
1057         $self->{midx}->create_xdb if @new;
1058 }
1059
1060 sub _watch_commit { # PublicInbox::DS::add_timer callback
1061         my ($self) = @_;
1062         delete $self->{-commit_timer};
1063         eidxq_process($self, $self->{-watch_sync});
1064         eidxq_release($self);
1065         my $fmt = delete $self->{-watch_sync}->{-regen_fmt};
1066         reindex_checkpoint($self, $self->{-watch_sync});
1067         $self->{-watch_sync}->{-regen_fmt} = $fmt;
1068
1069         # call event_step => done unless commit_timer is armed
1070         PublicInbox::DS::requeue($self);
1071 }
1072
1073 sub on_inbox_unlock { # called by PublicInbox::InboxIdle
1074         my ($self, $ibx) = @_;
1075         my $opt = $self->{-watch_sync}->{-opt};
1076         my $pr = $opt->{-progress};
1077         my $ekey = $ibx->eidx_key;
1078         local $0 = "sync $ekey";
1079         $pr->("indexing $ekey\n") if $pr;
1080         $self->idx_init($opt);
1081         sync_inbox($self, $self->{-watch_sync}, $ibx);
1082         $self->{-commit_timer} //= add_timer($opt->{'commit-interval'} // 10,
1083                                         \&_watch_commit, $self);
1084 }
1085
1086 sub eidx_reload { # -extindex --watch SIGHUP handler
1087         my ($self, $idler) = @_;
1088         if ($self->{cfg}) {
1089                 my $pr = $self->{-watch_sync}->{-opt}->{-progress};
1090                 $pr->('reloading ...') if $pr;
1091                 delete $self->{-resync_queue};
1092                 @{$self->{ibx_list}} = ();
1093                 %{$self->{ibx_map}} = ();
1094                 delete $self->{-watch_sync}->{id2pos};
1095                 my $cfg = PublicInbox::Config->new;
1096                 attach_config($self, $cfg);
1097                 $idler->refresh($cfg);
1098                 $pr->(" done\n") if $pr;
1099         } else {
1100                 warn "reload not supported without --all\n";
1101         }
1102 }
1103
1104 sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
1105         my ($self) = @_;
1106         $self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
1107         PublicInbox::DS::requeue($self); # trigger our ->event_step
1108 }
1109
1110 sub event_step { # PublicInbox::DS::requeue callback
1111         my ($self) = @_;
1112         if (my $resync_queue = $self->{-resync_queue}) {
1113                 if (my $ibx = shift(@$resync_queue)) {
1114                         on_inbox_unlock($self, $ibx);
1115                         PublicInbox::DS::requeue($self);
1116                 } else {
1117                         delete $self->{-resync_queue};
1118                         _watch_commit($self);
1119                 }
1120         } else {
1121                 done($self) unless $self->{-commit_timer};
1122         }
1123 }
1124
1125 sub eidx_watch { # public-inbox-extindex --watch main loop
1126         my ($self, $opt) = @_;
1127         local %SIG = %SIG;
1128         for my $sig (qw(HUP USR1 TSTP QUIT INT TERM)) {
1129                 $SIG{$sig} = sub { warn "SIG$sig ignored while scanning\n" };
1130         }
1131         require PublicInbox::InboxIdle;
1132         require PublicInbox::DS;
1133         require PublicInbox::Syscall;
1134         require PublicInbox::Sigfd;
1135         my $idler = PublicInbox::InboxIdle->new($self->{cfg});
1136         if (!$self->{cfg}) {
1137                 $idler->watch_inbox($_) for @{$self->{ibx_list}};
1138         }
1139         $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
1140         my $pr = $opt->{-progress};
1141         $pr->("performing initial scan ...\n") if $pr;
1142         my $sync = eidx_sync($self, $opt); # initial sync
1143         return if $sync->{quit};
1144         my $oldset = PublicInbox::DS::block_signals();
1145         local $self->{current_info} = '';
1146         my $cb = $SIG{__WARN__} || \&CORE::warn;
1147         local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
1148         my $sig = {
1149                 HUP => sub { eidx_reload($self, $idler) },
1150                 USR1 => sub { eidx_resync_start($self) },
1151                 TSTP => sub { kill('STOP', $$) },
1152         };
1153         my $quit = PublicInbox::SearchIdx::quit_cb($sync);
1154         $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
1155         my $sigfd = PublicInbox::Sigfd->new($sig,
1156                                         $PublicInbox::Syscall::SFD_NONBLOCK);
1157         %SIG = (%SIG, %$sig) if !$sigfd;
1158         local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
1159         if (!$sigfd) {
1160                 # wake up every second to accept signals if we don't
1161                 # have signalfd or IO::KQueue:
1162                 PublicInbox::DS::sig_setmask($oldset);
1163                 PublicInbox::DS->SetLoopTimeout(1000);
1164         }
1165         PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
1166         $pr->("initial scan complete, entering event loop\n") if $pr;
1167         PublicInbox::DS->EventLoop; # calls InboxIdle->event_step
1168         done($self);
1169 }
1170
1171 no warnings 'once';
1172 *done = \&PublicInbox::V2Writable::done;
1173 *with_umask = \&PublicInbox::InboxWritable::with_umask;
1174 *parallel_init = \&PublicInbox::V2Writable::parallel_init;
1175 *nproc_shards = \&PublicInbox::V2Writable::nproc_shards;
1176 *sync_prepare = \&PublicInbox::V2Writable::sync_prepare;
1177 *index_todo = \&PublicInbox::V2Writable::index_todo;
1178 *count_shards = \&PublicInbox::V2Writable::count_shards;
1179 *atfork_child = \&PublicInbox::V2Writable::atfork_child;
1180 *idx_shard = \&PublicInbox::V2Writable::idx_shard;
1181 *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
1182 *checkpoint = \&PublicInbox::V2Writable::checkpoint;
1183
1184 1;