]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/SearchIdx.pm
search: support "patchid:" prefix (git patch-id --stable)
[public-inbox.git] / lib / PublicInbox / SearchIdx.pm
1 # Copyright (C) all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 # based on notmuch, but with no concept of folders, files
4 #
5 # Indexes mail with Xapian and our (SQLite-based) ::Msgmap for use
6 # with the web and NNTP interfaces.  This index maintains thread
7 # relationships for use by PublicInbox::SearchThread.
8 # This writes to the search index.
9 package PublicInbox::SearchIdx;
10 use strict;
11 use v5.10.1;
12 use parent qw(PublicInbox::Search PublicInbox::Lock Exporter);
13 use PublicInbox::Eml;
14 use PublicInbox::Search qw(xap_terms);
15 use PublicInbox::InboxWritable;
16 use PublicInbox::MID qw(mids_for_index mids);
17 use PublicInbox::MsgIter;
18 use PublicInbox::IdxStack;
19 use Carp qw(croak carp);
20 use POSIX qw(strftime);
21 use Fcntl qw(SEEK_SET);
22 use Time::Local qw(timegm);
23 use PublicInbox::OverIdx;
24 use PublicInbox::Spawn qw(spawn);
25 use PublicInbox::Git qw(git_unquote);
26 use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
27 use PublicInbox::Address;
28 use Config;
29 our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack
30         index_text term_generator add_val is_bad_blob);
31 my $X = \%PublicInbox::Search::X;
32 our ($DB_CREATE_OR_OPEN, $DB_OPEN);
33 our $DB_NO_SYNC = 0;
34 our $DB_DANGEROUS = 0;
35 our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff :
36         # assume a typical 64-bit system has 8x more RAM than a
37         # typical 32-bit system:
38         (($Config{ptrsize} >= 8 ? 8192 : 1024) * 1024);
39
40 use constant DEBUG => !!$ENV{DEBUG};
41
42 my $xapianlevels = qr/\A(?:full|medium)\z/;
43 my $hex = '[a-f0-9]';
44 my $OID = $hex .'{40,}';
45 my @VMD_MAP = (kw => 'K', L => 'L');
46 our $INDEXLEVELS = qr/\A(?:full|medium|basic)\z/;
47
48 sub new {
49         my ($class, $ibx, $creat, $shard) = @_;
50         ref $ibx or die "BUG: expected PublicInbox::Inbox object: $ibx";
51         my $inboxdir = $ibx->{inboxdir};
52         my $version = $ibx->version;
53         my $indexlevel = 'full';
54         my $altid = $ibx->{altid};
55         if ($altid) {
56                 require PublicInbox::AltId;
57                 $altid = [ map { PublicInbox::AltId->new($ibx, $_); } @$altid ];
58         }
59         if ($ibx->{indexlevel}) {
60                 if ($ibx->{indexlevel} =~ $INDEXLEVELS) {
61                         $indexlevel = $ibx->{indexlevel};
62                 } else {
63                         die("Invalid indexlevel $ibx->{indexlevel}\n");
64                 }
65         }
66         $ibx = PublicInbox::InboxWritable->new($ibx);
67         my $self = PublicInbox::Search->new($ibx);
68         bless $self, $class;
69         $self->{ibx} = $ibx;
70         $self->{-altid} = $altid;
71         $self->{indexlevel} = $indexlevel;
72         $self->{-set_indexlevel_once} = 1 if $indexlevel eq 'medium';
73         if ($ibx->{-skip_docdata}) {
74                 $self->{-set_skip_docdata_once} = 1;
75                 $self->{-skip_docdata} = 1;
76         }
77         if ($version == 1) {
78                 $self->{lock_path} = "$inboxdir/ssoma.lock";
79                 my $dir = $self->xdir;
80                 $self->{oidx} = PublicInbox::OverIdx->new("$dir/over.sqlite3");
81                 $self->{oidx}->{-no_fsync} = 1 if $ibx->{-no_fsync};
82         } elsif ($version == 2) {
83                 defined $shard or die "shard is required for v2\n";
84                 # shard is a number
85                 $self->{shard} = $shard;
86                 $self->{lock_path} = undef;
87         } else {
88                 die "unsupported inbox version=$version\n";
89         }
90         $self->{creat} = ($creat || 0) == 1;
91         $self;
92 }
93
94 sub need_xapian ($) { $_[0]->{indexlevel} =~ $xapianlevels }
95
96 sub idx_release {
97         my ($self, $wake) = @_;
98         if (need_xapian($self)) {
99                 my $xdb = delete $self->{xdb} or croak '{xdb} not acquired';
100                 $xdb->close;
101         }
102         $self->lock_release($wake) if $self->{creat};
103         undef;
104 }
105
106 sub load_xapian_writable () {
107         return 1 if $X->{WritableDatabase};
108         PublicInbox::Search::load_xapian() or die "failed to load Xapian: $@\n";
109         my $xap = $PublicInbox::Search::Xap;
110         for (qw(Document TermGenerator WritableDatabase)) {
111                 $X->{$_} = $xap.'::'.$_;
112         }
113         eval 'require '.$X->{WritableDatabase} or die;
114         *sortable_serialise = $xap.'::sortable_serialise';
115         $DB_CREATE_OR_OPEN = eval($xap.'::DB_CREATE_OR_OPEN()');
116         $DB_OPEN = eval($xap.'::DB_OPEN()');
117         my $ver = (eval($xap.'::major_version()') << 16) |
118                 (eval($xap.'::minor_version()') << 8) |
119                 eval($xap.'::revision()');
120         if ($ver >= 0x10400) {
121                 $DB_NO_SYNC = 0x4;
122                 $DB_DANGEROUS = 0x10;
123         }
124         # Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks
125         $X->{CLOEXEC_UNSET} = 1 if $ver >= 0x010215 && $ver <= 0x010218;
126         1;
127 }
128
129 sub idx_acquire {
130         my ($self) = @_;
131         my $flag;
132         my $dir = $self->xdir;
133         if (need_xapian($self)) {
134                 croak 'already acquired' if $self->{xdb};
135                 load_xapian_writable();
136                 $flag = $self->{creat} ? $DB_CREATE_OR_OPEN : $DB_OPEN;
137         }
138         if ($self->{creat}) {
139                 require File::Path;
140                 $self->lock_acquire;
141
142                 # don't create empty Xapian directories if we don't need Xapian
143                 my $is_shard = defined($self->{shard});
144                 if (!-d $dir && (!$is_shard ||
145                                 ($is_shard && need_xapian($self)))) {
146                         File::Path::mkpath($dir);
147                         require PublicInbox::Syscall;
148                         PublicInbox::Syscall::nodatacow_dir($dir);
149                         $self->{-set_has_threadid_once} = 1;
150                         if (($self->{ibx} // $self->{eidx})->{-dangerous}) {
151                                 $flag |= $DB_DANGEROUS;
152                         }
153                 }
154         }
155         return unless defined $flag;
156         $flag |= $DB_NO_SYNC if ($self->{ibx} // $self->{eidx})->{-no_fsync};
157         my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) };
158         croak "Failed opening $dir: $@" if $@;
159         $self->{xdb} = $xdb;
160 }
161
162 sub add_val ($$$) {
163         my ($doc, $col, $num) = @_;
164         $num = sortable_serialise($num);
165         $doc->add_value($col, $num);
166 }
167
168 sub term_generator ($) { # write-only
169         my ($self) = @_;
170
171         $self->{term_generator} //= do {
172                 my $tg = $X->{TermGenerator}->new;
173                 $tg->set_stemmer(PublicInbox::Search::stemmer($self));
174                 $tg;
175         }
176 }
177
178 sub index_phrase ($$$$) {
179         my ($self, $text, $wdf_inc, $prefix) = @_;
180
181         my $tg = term_generator($self);
182         $tg->index_text($text, $wdf_inc, $prefix);
183         $tg->increase_termpos;
184 }
185
186 sub index_text ($$$$) {
187         my ($self, $text, $wdf_inc, $prefix) = @_;
188
189         if ($self->{indexlevel} eq 'full') {
190                 index_phrase($self, $text, $wdf_inc, $prefix);
191         } else {
192                 my $tg = term_generator($self);
193                 $tg->index_text_without_positions($text, $wdf_inc, $prefix);
194         }
195 }
196
197 sub index_headers ($$) {
198         my ($self, $smsg) = @_;
199         my @x = (from => 'A', to => 'XTO', cc => 'XCC'); # A: Author
200         while (my ($field, $pfx) = splice(@x, 0, 2)) {
201                 my $val = $smsg->{$field};
202                 next if $val eq '';
203                 # include "(comments)" after the address, too, so not using
204                 # PublicInbox::Address::names or pairs
205                 index_text($self, $val, 1, $pfx);
206
207                 # we need positional info for email addresses since they
208                 # can be considered phrases
209                 if ($self->{indexlevel} eq 'medium') {
210                         for my $addr (PublicInbox::Address::emails($val)) {
211                                 index_phrase($self, $addr, 1, $pfx);
212                         }
213                 }
214         }
215         @x = (subject => 'S');
216         while (my ($field, $pfx) = splice(@x, 0, 2)) {
217                 my $val = $smsg->{$field};
218                 index_text($self, $val, 1, $pfx) if $val ne '';
219         }
220 }
221
222 sub index_diff_inc ($$$$) {
223         my ($self, $text, $pfx, $xnq) = @_;
224         if (@$xnq) {
225                 index_text($self, join("\n", @$xnq), 1, 'XNQ');
226                 @$xnq = ();
227         }
228         if ($pfx eq 'XDFN') {
229                 index_phrase($self, $text, 1, $pfx);
230         } else {
231                 index_text($self, $text, 1, $pfx);
232         }
233 }
234
235 sub index_old_diff_fn {
236         my ($self, $seen, $fa, $fb, $xnq) = @_;
237
238         # no renames or space support for traditional diffs,
239         # find the number of leading common paths to strip:
240         my @fa = split(m'/', $fa);
241         my @fb = split(m'/', $fb);
242         while (scalar(@fa) && scalar(@fb)) {
243                 $fa = join('/', @fa);
244                 $fb = join('/', @fb);
245                 if ($fa eq $fb) {
246                         unless ($seen->{$fa}++) {
247                                 index_diff_inc($self, $fa, 'XDFN', $xnq);
248                         }
249                         return 1;
250                 }
251                 shift @fa;
252                 shift @fb;
253         }
254         0;
255 }
256
257 sub index_diff ($$$) {
258         my ($self, $txt, $doc) = @_;
259         my %seen;
260         my $in_diff;
261         my @xnq;
262         my $xnq = \@xnq;
263         foreach (split(/\n/, $txt)) {
264                 if ($in_diff && s/^ //) { # diff context
265                         index_diff_inc($self, $_, 'XDFCTX', $xnq);
266                 } elsif (/^-- $/) { # email signature begins
267                         $in_diff = undef;
268                 } elsif (m!^diff --git "?[^/]+/.+ "?[^/]+/.+\z!) {
269                         # wait until "---" and "+++" to capture filenames
270                         $in_diff = 1;
271                         push @xnq, $_;
272                 # traditional diff:
273                 } elsif (m/^diff -(.+) (\S+) (\S+)$/) {
274                         my ($opt, $fa, $fb) = ($1, $2, $3);
275                         push @xnq, $_;
276                         # only support unified:
277                         next unless $opt =~ /[uU]/;
278                         $in_diff = index_old_diff_fn($self, \%seen, $fa, $fb,
279                                                         $xnq);
280                 } elsif (m!^--- ("?[^/]+/.+)!) {
281                         my $fn = $1;
282                         $fn = (split(m'/', git_unquote($fn), 2))[1];
283                         $seen{$fn}++ or index_diff_inc($self, $fn, 'XDFN', $xnq);
284                         $in_diff = 1;
285                 } elsif (m!^\+\+\+ ("?[^/]+/.+)!)  {
286                         my $fn = $1;
287                         $fn = (split(m'/', git_unquote($fn), 2))[1];
288                         $seen{$fn}++ or index_diff_inc($self, $fn, 'XDFN', $xnq);
289                         $in_diff = 1;
290                 } elsif (/^--- (\S+)/) {
291                         $in_diff = $1;
292                         push @xnq, $_;
293                 } elsif (defined $in_diff && /^\+\+\+ (\S+)/) {
294                         $in_diff = index_old_diff_fn($self, \%seen, $in_diff,
295                                                         $1, $xnq);
296                 } elsif ($in_diff && s/^\+//) { # diff added
297                         index_diff_inc($self, $_, 'XDFB', $xnq);
298                 } elsif ($in_diff && s/^-//) { # diff removed
299                         index_diff_inc($self, $_, 'XDFA', $xnq);
300                 } elsif (m!^index ([a-f0-9]+)\.\.([a-f0-9]+)!) {
301                         my ($ba, $bb) = ($1, $2);
302                         index_git_blob_id($doc, 'XDFPRE', $ba);
303                         index_git_blob_id($doc, 'XDFPOST', $bb);
304                         $in_diff = 1;
305                 } elsif (/^@@ (?:\S+) (?:\S+) @@\s*$/) {
306                         # traditional diff w/o -p
307                 } elsif (/^@@ (?:\S+) (?:\S+) @@\s*(\S+.*)$/) {
308                         # hunk header context
309                         index_diff_inc($self, $1, 'XDFHH', $xnq);
310                 # ignore the following lines:
311                 } elsif (/^(?:dis)similarity index/ ||
312                                 /^(?:old|new) mode/ ||
313                                 /^(?:deleted|new) file mode/ ||
314                                 /^(?:copy|rename) (?:from|to) / ||
315                                 /^(?:dis)?similarity index / ||
316                                 /^\\ No newline at end of file/ ||
317                                 /^Binary files .* differ/) {
318                         push @xnq, $_;
319                 } elsif ($_ eq '') {
320                         # possible to be in diff context, some mail may be
321                         # stripped by MUA or even GNU diff(1).  "git apply"
322                         # treats a bare "\n" as diff context, too
323                 } else {
324                         push @xnq, $_;
325                         warn "non-diff line: $_\n" if DEBUG && $_ ne '';
326                         $in_diff = undef;
327                 }
328         }
329
330         index_text($self, join("\n", @xnq), 1, 'XNQ');
331 }
332
333 sub index_xapian { # msg_iter callback
334         my $part = $_[0]->[0]; # ignore $depth and $idx
335         my ($self, $doc) = @{$_[1]};
336         my $ct = $part->content_type || 'text/plain';
337         my $fn = $part->filename;
338         if (defined $fn && $fn ne '') {
339                 index_phrase($self, $fn, 1, 'XFN');
340         }
341         if ($part->{is_submsg}) {
342                 my $mids = mids_for_index($part);
343                 index_ids($self, $doc, $part, $mids);
344                 my $smsg = bless {}, 'PublicInbox::Smsg';
345                 $smsg->populate($part);
346                 index_headers($self, $smsg);
347         }
348
349         my ($s, undef) = msg_part_text($part, $ct);
350         defined $s or return;
351         $_[0]->[0] = $part = undef; # free memory
352
353         if ($s =~ /^(?:diff|---|\+\+\+) /ms) {
354                 open(my $fh, '+>:utf8', undef) or die "open: $!";
355                 open(my $eh, '+>', undef) or die "open: $!";
356                 $fh->autoflush(1);
357                 print $fh $s or die "print: $!";
358                 sysseek($fh, 0, SEEK_SET) or die "sysseek: $!";
359                 my $id = ($self->{ibx} // $self->{eidx})->git->qx(
360                                                 [qw(patch-id --stable)],
361                                                 {}, { 0 => $fh, 2 => $eh });
362                 $id =~ /\A([a-f0-9]{40,})/ and $doc->add_term('XDFID'.$1);
363                 seek($eh, 0, SEEK_SET) or die "seek: $!";
364                 while (<$eh>) { warn $_ }
365         }
366
367         # split off quoted and unquoted blocks:
368         my @sections = PublicInbox::MsgIter::split_quotes($s);
369         undef $s; # free memory
370         for my $txt (@sections) {
371                 if ($txt =~ /\A>/) {
372                         index_text($self, $txt, 0, 'XQUOT');
373                 } else {
374                         # does it look like a diff?
375                         if ($txt =~ /^(?:diff|---|\+\+\+) /ms) {
376                                 index_diff($self, $txt, $doc);
377                         } else {
378                                 index_text($self, $txt, 1, 'XNQ');
379                         }
380                 }
381                 undef $txt; # free memory
382         }
383 }
384
385 sub index_list_id ($$$) {
386         my ($self, $doc, $hdr) = @_;
387         for my $l ($hdr->header_raw('List-Id')) {
388                 $l =~ /<([^>]+)>/ or next;
389                 my $lid = lc $1;
390                 $doc->add_boolean_term('G' . $lid);
391                 index_phrase($self, $lid, 1, 'XL'); # probabilistic
392         }
393 }
394
395 sub index_ids ($$$$) {
396         my ($self, $doc, $hdr, $mids) = @_;
397         for my $mid (@$mids) {
398                 index_phrase($self, $mid, 1, 'XM');
399
400                 # because too many Message-IDs are prefixed with
401                 # "Pine.LNX."...
402                 if ($mid =~ /\w{12,}/) {
403                         my @long = ($mid =~ /(\w{3,}+)/g);
404                         index_phrase($self, join(' ', @long), 1, 'XM');
405                 }
406         }
407         $doc->add_boolean_term('Q' . $_) for @$mids;
408         index_list_id($self, $doc, $hdr);
409 }
410
411 sub eml2doc ($$$;$) {
412         my ($self, $eml, $smsg, $mids) = @_;
413         $mids //= mids_for_index($eml);
414         my $doc = $X->{Document}->new;
415         add_val($doc, PublicInbox::Search::TS(), $smsg->{ts});
416         my @ds = gmtime($smsg->{ds});
417         my $yyyymmdd = strftime('%Y%m%d', @ds);
418         add_val($doc, PublicInbox::Search::YYYYMMDD(), $yyyymmdd);
419         my $dt = strftime('%Y%m%d%H%M%S', @ds);
420         add_val($doc, PublicInbox::Search::DT(), $dt);
421         add_val($doc, PublicInbox::Search::BYTES(), $smsg->{bytes});
422         add_val($doc, PublicInbox::Search::UID(), $smsg->{num});
423         add_val($doc, PublicInbox::Search::THREADID, $smsg->{tid});
424
425         my $tg = term_generator($self);
426         $tg->set_document($doc);
427         index_headers($self, $smsg);
428
429         if (defined(my $eidx_key = $smsg->{eidx_key})) {
430                 $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
431         }
432         msg_iter($eml, \&index_xapian, [ $self, $doc ]);
433         index_ids($self, $doc, $eml, $mids);
434
435         # by default, we maintain compatibility with v1.5.0 and earlier
436         # by writing to docdata.glass, users who never exect to downgrade can
437         # use --skip-docdata
438         if (!$self->{-skip_docdata}) {
439                 # WWW doesn't need {to} or {cc}, only NNTP
440                 $smsg->{to} = $smsg->{cc} = '';
441                 $smsg->parse_references($eml, $mids);
442                 my $data = $smsg->to_doc_data;
443                 $doc->set_data($data);
444         }
445
446         if (my $altid = $self->{-altid}) {
447                 foreach my $alt (@$altid) {
448                         my $pfx = $alt->{xprefix};
449                         foreach my $mid (@$mids) {
450                                 my $id = $alt->mid2alt($mid);
451                                 next unless defined $id;
452                                 $doc->add_boolean_term($pfx . $id);
453                         }
454                 }
455         }
456         $doc;
457 }
458
459 sub add_xapian ($$$$) {
460         my ($self, $eml, $smsg, $mids) = @_;
461         begin_txn_lazy($self);
462         my $merge_vmd = delete $smsg->{-merge_vmd};
463         my $doc = eml2doc($self, $eml, $smsg, $mids);
464         if (my $old = $merge_vmd ? _get_doc($self, $smsg->{num}) : undef) {
465                 my @x = @VMD_MAP;
466                 while (my ($field, $pfx) = splice(@x, 0, 2)) {
467                         for my $term (xap_terms($pfx, $old)) {
468                                 $doc->add_boolean_term($pfx.$term);
469                         }
470                 }
471         }
472         $self->{xdb}->replace_document($smsg->{num}, $doc);
473 }
474
475 sub _msgmap_init ($) {
476         my ($self) = @_;
477         die "BUG: _msgmap_init is only for v1\n" if $self->{ibx}->version != 1;
478         $self->{mm} //= do {
479                 require PublicInbox::Msgmap;
480                 PublicInbox::Msgmap->new_file($self->{ibx}, 1);
481         };
482 }
483
484 sub add_message {
485         # mime = PublicInbox::Eml or Email::MIME object
486         my ($self, $mime, $smsg, $sync) = @_;
487         begin_txn_lazy($self);
488         my $mids = mids_for_index($mime);
489         $smsg //= bless { blob => '' }, 'PublicInbox::Smsg'; # test-only compat
490         $smsg->{mid} //= $mids->[0]; # v1 compatibility
491         $smsg->{num} //= do { # v1
492                 _msgmap_init($self);
493                 index_mm($self, $mime, $smsg->{blob}, $sync);
494         };
495
496         # v1 and tests only:
497         $smsg->populate($mime, $sync);
498         $smsg->{bytes} //= length($mime->as_string);
499
500         eval {
501                 # order matters, overview stores every possible piece of
502                 # data in doc_data (deflated).  Xapian only stores a subset
503                 # of the fields which exist in over.sqlite3.  We may stop
504                 # storing doc_data in Xapian sometime after we get multi-inbox
505                 # search working.
506                 if (my $oidx = $self->{oidx}) { # v1 only
507                         $oidx->add_overview($mime, $smsg);
508                 }
509                 if (need_xapian($self)) {
510                         add_xapian($self, $mime, $smsg, $mids);
511                 }
512         };
513
514         if ($@) {
515                 warn "failed to index message <".join('> <',@$mids).">: $@\n";
516                 return undef;
517         }
518         $smsg->{num};
519 }
520
521 sub _get_doc ($$) {
522         my ($self, $docid) = @_;
523         my $doc = eval { $self->{xdb}->get_document($docid) };
524         $doc // do {
525                 warn "E: $@\n" if $@;
526                 warn "E: #$docid missing in Xapian\n";
527                 undef;
528         }
529 }
530
531 sub add_eidx_info {
532         my ($self, $docid, $eidx_key, $eml) = @_;
533         begin_txn_lazy($self);
534         my $doc = _get_doc($self, $docid) or return;
535         term_generator($self)->set_document($doc);
536
537         # '.' is special for lei_store
538         $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
539
540         index_list_id($self, $doc, $eml);
541         $self->{xdb}->replace_document($docid, $doc);
542 }
543
544 sub get_terms {
545         my ($self, $pfx, $docid) = @_;
546         begin_txn_lazy($self);
547         xap_terms($pfx, $self->{xdb}, $docid);
548 }
549
550 sub remove_eidx_info {
551         my ($self, $docid, $eidx_key, $eml) = @_;
552         begin_txn_lazy($self);
553         my $doc = _get_doc($self, $docid) or return;
554         eval { $doc->remove_term('O'.$eidx_key) };
555         warn "W: ->remove_term O$eidx_key: $@\n" if $@;
556         for my $l ($eml ? $eml->header_raw('List-Id') : ()) {
557                 $l =~ /<([^>]+)>/ or next;
558                 my $lid = lc $1;
559                 eval { $doc->remove_term('G' . $lid) };
560                 warn "W: ->remove_term G$lid: $@\n" if $@;
561
562                 # nb: we don't remove the XL probabilistic terms
563                 # since terms may overlap if cross-posted.
564                 #
565                 # IOW, a message which has both <foo.example.com>
566                 # and <bar.example.com> would have overlapping
567                 # "XLexample" and "XLcom" as terms and which we
568                 # wouldn't know if they're safe to remove if we just
569                 # unindex <foo.example.com> while preserving
570                 # <bar.example.com>.
571                 #
572                 # In any case, this entire sub is will likely never
573                 # be needed and users using the "l:" prefix are probably
574                 # rarer.
575         }
576         $self->{xdb}->replace_document($docid, $doc);
577 }
578
579 sub set_vmd {
580         my ($self, $docid, $vmd) = @_;
581         begin_txn_lazy($self);
582         my $doc = _get_doc($self, $docid) or return;
583         my ($end, @rm, @add);
584         my @x = @VMD_MAP;
585         while (my ($field, $pfx) = splice(@x, 0, 2)) {
586                 my $set = $vmd->{$field} // next;
587                 my %keep = map { $_ => 1 } @$set;
588                 my %add = %keep;
589                 $end //= $doc->termlist_end;
590                 for (my $cur = $doc->termlist_begin; $cur != $end; $cur++) {
591                         $cur->skip_to($pfx);
592                         last if $cur == $end;
593                         my $v = $cur->get_termname;
594                         $v =~ s/\A$pfx//s or next;
595                         $keep{$v} ? delete($add{$v}) : push(@rm, $pfx.$v);
596                 }
597                 push(@add, map { $pfx.$_ } keys %add);
598         }
599         return unless scalar(@rm) || scalar(@add);
600         $doc->remove_term($_) for @rm;
601         $doc->add_boolean_term($_) for @add;
602         $self->{xdb}->replace_document($docid, $doc);
603 }
604
605 sub apply_vmd_mod ($$) {
606         my ($doc, $vmd_mod) = @_;
607         my $updated = 0;
608         my @x = @VMD_MAP;
609         while (my ($field, $pfx) = splice(@x, 0, 2)) {
610                 # field: "L" or "kw"
611                 for my $val (@{$vmd_mod->{"-$field"} // []}) {
612                         eval {
613                                 $doc->remove_term($pfx . $val);
614                                 ++$updated;
615                         };
616                 }
617                 for my $val (@{$vmd_mod->{"+$field"} // []}) {
618                         $doc->add_boolean_term($pfx . $val);
619                         ++$updated;
620                 }
621         }
622         $updated;
623 }
624
625 sub add_vmd {
626         my ($self, $docid, $vmd) = @_;
627         begin_txn_lazy($self);
628         my $doc = _get_doc($self, $docid) or return;
629         my @x = @VMD_MAP;
630         my $updated = 0;
631         while (my ($field, $pfx) = splice(@x, 0, 2)) {
632                 my $add = $vmd->{$field} // next;
633                 $doc->add_boolean_term($pfx . $_) for @$add;
634                 $updated += scalar(@$add);
635         }
636         $updated += apply_vmd_mod($doc, $vmd);
637         $self->{xdb}->replace_document($docid, $doc) if $updated;
638 }
639
640 sub remove_vmd {
641         my ($self, $docid, $vmd) = @_;
642         begin_txn_lazy($self);
643         my $doc = _get_doc($self, $docid) or return;
644         my $replace;
645         my @x = @VMD_MAP;
646         while (my ($field, $pfx) = splice(@x, 0, 2)) {
647                 my $rm = $vmd->{$field} // next;
648                 for (@$rm) {
649                         eval {
650                                 $doc->remove_term($pfx . $_);
651                                 $replace = 1;
652                         };
653                 }
654         }
655         $self->{xdb}->replace_document($docid, $doc) if $replace;
656 }
657
658 sub update_vmd {
659         my ($self, $docid, $vmd_mod) = @_;
660         begin_txn_lazy($self);
661         my $doc = _get_doc($self, $docid) or return;
662         my $updated = apply_vmd_mod($doc, $vmd_mod);
663         $self->{xdb}->replace_document($docid, $doc) if $updated;
664         $updated;
665 }
666
667 sub xdb_remove {
668         my ($self, @docids) = @_;
669         begin_txn_lazy($self);
670         my $xdb = $self->{xdb} // die 'BUG: missing {xdb}';
671         for my $docid (@docids) {
672                 eval { $xdb->delete_document($docid) };
673                 warn "E: #$docid not in in Xapian? $@\n" if $@;
674         }
675 }
676
677 sub xdb_remove_quiet {
678         my ($self, $docid) = @_;
679         begin_txn_lazy($self);
680         my $xdb = $self->{xdb} // die 'BUG: missing {xdb}';
681         eval { $xdb->delete_document($docid) };
682         ++$self->{-quiet_rm} unless $@;
683 }
684
685 sub nr_quiet_rm { delete($_[0]->{-quiet_rm}) // 0 }
686
687 sub index_git_blob_id {
688         my ($doc, $pfx, $objid) = @_;
689
690         my $len = length($objid);
691         for (my $len = length($objid); $len >= 7; ) {
692                 $doc->add_term($pfx.$objid);
693                 $objid = substr($objid, 0, --$len);
694         }
695 }
696
697 # v1 only
698 sub unindex_eml {
699         my ($self, $oid, $eml) = @_;
700         my $mids = mids($eml);
701         my $nr = 0;
702         my %tmp;
703         for my $mid (@$mids) {
704                 my @removed = $self->{oidx}->remove_oid($oid, $mid);
705                 $nr += scalar @removed;
706                 $tmp{$_}++ for @removed;
707         }
708         if (!$nr) {
709                 my $m = join('> <', @$mids);
710                 warn "W: <$m> missing for removal from overview\n";
711         }
712         while (my ($num, $nr) = each %tmp) {
713                 warn "BUG: $num appears >1 times ($nr) for $oid\n" if $nr != 1;
714         }
715         if ($nr) {
716                 $self->{mm}->num_delete($_) for (keys %tmp);
717         } else { # just in case msgmap and over.sqlite3 become desynched:
718                 $self->{mm}->mid_delete($mids->[0]);
719         }
720         xdb_remove($self, keys %tmp) if need_xapian($self);
721 }
722
723 sub index_mm {
724         my ($self, $mime, $oid, $sync) = @_;
725         my $mids = mids($mime);
726         my $mm = $self->{mm};
727         if ($sync->{reindex}) {
728                 my $oidx = $self->{oidx};
729                 for my $mid (@$mids) {
730                         my ($num, undef) = $oidx->num_mid0_for_oid($oid, $mid);
731                         return $num if defined $num;
732                 }
733                 $mm->num_for($mids->[0]) // $mm->mid_insert($mids->[0]);
734         } else {
735                 # fallback to num_for since filters like RubyLang set the number
736                 $mm->mid_insert($mids->[0]) // $mm->num_for($mids->[0]);
737         }
738 }
739
740 sub is_bad_blob ($$$$) {
741         my ($oid, $type, $size, $expect_oid) = @_;
742         if ($type ne 'blob') {
743                 carp "W: $expect_oid is not a blob (type=$type)";
744                 return 1;
745         }
746         croak "BUG: $oid != $expect_oid" if $oid ne $expect_oid;
747         $size == 0 ? 1 : 0; # size == 0 means purged
748 }
749
750 sub index_both { # git->cat_async callback
751         my ($bref, $oid, $type, $size, $sync) = @_;
752         return if is_bad_blob($oid, $type, $size, $sync->{oid});
753         my ($nr, $max) = @$sync{qw(nr max)};
754         ++$$nr;
755         $$max -= $size;
756         my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
757         $smsg->set_bytes($$bref, $size);
758         my $self = $sync->{sidx};
759         local $self->{current_info} = "$self->{current_info}: $oid";
760         my $eml = PublicInbox::Eml->new($bref);
761         $smsg->{num} = index_mm($self, $eml, $oid, $sync) or
762                 die "E: could not generate NNTP article number for $oid";
763         add_message($self, $eml, $smsg, $sync);
764         ++$self->{nidx};
765         my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
766         ${$sync->{latest_cmt}} = $cur_cmt;
767 }
768
769 sub unindex_both { # git->cat_async callback
770         my ($bref, $oid, $type, $size, $sync) = @_;
771         return if is_bad_blob($oid, $type, $size, $sync->{oid});
772         my $self = $sync->{sidx};
773         local $self->{current_info} = "$self->{current_info}: $oid";
774         unindex_eml($self, $oid, PublicInbox::Eml->new($bref));
775         # may be undef if leftover
776         if (defined(my $cur_cmt = $sync->{cur_cmt})) {
777                 ${$sync->{latest_cmt}} = $cur_cmt;
778         }
779         ++$self->{nidx};
780 }
781
782 sub with_umask {
783         my $self = shift;
784         ($self->{ibx} // $self->{eidx})->with_umask(@_);
785 }
786
787 # called by public-inbox-index
788 sub index_sync {
789         my ($self, $opt) = @_;
790         delete $self->{lock_path} if $opt->{-skip_lock};
791         $self->with_umask(\&_index_sync, $self, $opt);
792         if ($opt->{reindex} && !$opt->{quit} &&
793                         !grep(defined, @$opt{qw(since until)})) {
794                 my %again = %$opt;
795                 delete @again{qw(rethread reindex)};
796                 index_sync($self, \%again);
797                 $opt->{quit} = $again{quit}; # propagate to caller
798         }
799 }
800
801 sub check_size { # check_async cb for -index --max-size=...
802         my ($oid, $type, $size, $arg, $git) = @_;
803         (($type // '') eq 'blob') or die "E: bad $oid in $git->{git_dir}";
804         if ($size <= $arg->{max_size}) {
805                 $git->cat_async($oid, $arg->{index_oid}, $arg);
806         } else {
807                 warn "W: skipping $oid ($size > $arg->{max_size})\n";
808         }
809 }
810
811 sub v1_checkpoint ($$;$) {
812         my ($self, $sync, $stk) = @_;
813         $self->{ibx}->git->async_wait_all;
814
815         # $newest may be undef
816         my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
817         if (defined($newest)) {
818                 my $cur = $self->{mm}->last_commit;
819                 if (need_update($self, $sync, $cur, $newest)) {
820                         $self->{mm}->last_commit($newest);
821                 }
822         }
823         ${$sync->{max}} = $self->{batch_bytes};
824
825         $self->{mm}->{dbh}->commit;
826         eval { $self->{mm}->{dbh}->do('PRAGMA optimize') };
827         my $xdb = $self->{xdb};
828         if ($newest && $xdb) {
829                 my $cur = $xdb->get_metadata('last_commit');
830                 if (need_update($self, $sync, $cur, $newest)) {
831                         $xdb->set_metadata('last_commit', $newest);
832                 }
833         }
834         if ($stk) { # all done if $stk is passed
835                 # let SearchView know a full --reindex was done so it can
836                 # generate ->has_threadid-dependent links
837                 if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) {
838                         my $n = $xdb->get_metadata('has_threadid');
839                         $xdb->set_metadata('has_threadid', '1') if $n ne '1';
840                 }
841                 $self->{oidx}->rethread_done($sync->{-opt}); # all done
842         }
843         commit_txn_lazy($self);
844         $sync->{ibx}->git->cleanup;
845         my $nr = ${$sync->{nr}};
846         idx_release($self, $nr);
847         # let another process do some work...
848         if (my $pr = $sync->{-opt}->{-progress}) {
849                 $pr->("indexed $nr/$sync->{ntodo}\n") if $nr;
850         }
851         if (!$stk && !$sync->{quit}) { # more to come
852                 begin_txn_lazy($self);
853                 $self->{mm}->{dbh}->begin_work;
854         }
855 }
856
857 # only for v1
858 sub process_stack {
859         my ($self, $sync, $stk) = @_;
860         my $git = $sync->{ibx}->git;
861         my $max = $self->{batch_bytes};
862         my $nr = 0;
863         $sync->{nr} = \$nr;
864         $sync->{max} = \$max;
865         $sync->{sidx} = $self;
866         $sync->{latest_cmt} = \(my $latest_cmt);
867
868         $self->{mm}->{dbh}->begin_work;
869         if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
870                 warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
871                 for my $oid (@leftovers) {
872                         last if $sync->{quit};
873                         $oid = unpack('H*', $oid);
874                         $git->cat_async($oid, \&unindex_both, $sync);
875                 }
876         }
877         if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
878                 $sync->{index_oid} = \&index_both;
879         }
880         while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
881                 my $arg = { %$sync, cur_cmt => $cur_cmt, oid => $oid };
882                 last if $sync->{quit};
883                 if ($f eq 'm') {
884                         $arg->{autime} = $at;
885                         $arg->{cotime} = $ct;
886                         if ($sync->{max_size}) {
887                                 $git->check_async($oid, \&check_size, $arg);
888                         } else {
889                                 $git->cat_async($oid, \&index_both, $arg);
890                         }
891                         v1_checkpoint($self, $sync) if $max <= 0;
892                 } elsif ($f eq 'd') {
893                         $git->cat_async($oid, \&unindex_both, $arg);
894                 }
895         }
896         v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
897 }
898
899 sub log2stack ($$$) {
900         my ($sync, $git, $range) = @_;
901         my $D = $sync->{D}; # OID_BIN => NR (if reindexing, undef otherwise)
902         my ($add, $del);
903         if ($sync->{ibx}->version == 1) {
904                 my $path = $hex.'{2}/'.$hex.'{38}';
905                 $add = qr!\A:000000 100644 \S+ ($OID) A\t$path$!;
906                 $del = qr!\A:100644 000000 ($OID) \S+ D\t$path$!;
907         } else {
908                 $del = qr!\A:\d{6} 100644 $OID ($OID) [AM]\td$!;
909                 $add = qr!\A:\d{6} 100644 $OID ($OID) [AM]\tm$!;
910         }
911
912         # Count the new files so they can be added newest to oldest
913         # and still have numbers increasing from oldest to newest
914         my @cmd = qw(log --raw -r --pretty=tformat:%at-%ct-%H
915                         --no-notes --no-color --no-renames --no-abbrev);
916         for my $k (qw(since until)) {
917                 my $v = $sync->{-opt}->{$k} // next;
918                 next if !$sync->{-opt}->{reindex};
919                 push @cmd, "--$k=$v";
920         }
921         my $fh = $git->popen(@cmd, $range);
922         my ($at, $ct, $stk, $cmt, $l);
923         while (defined($l = <$fh>)) {
924                 return if $sync->{quit};
925                 if ($l =~ /\A([0-9]+)-([0-9]+)-($OID)$/o) {
926                         ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
927                         $stk //= PublicInbox::IdxStack->new($cmt);
928                 } elsif ($l =~ /$del/) {
929                         my $oid = $1;
930                         if ($D) { # reindex case
931                                 $D->{pack('H*', $oid)}++;
932                         } else { # non-reindex case:
933                                 $stk->push_rec('d', $at, $ct, $oid, $cmt);
934                         }
935                 } elsif ($l =~ /$add/) {
936                         my $oid = $1;
937                         if ($D) {
938                                 my $oid_bin = pack('H*', $oid);
939                                 my $nr = --$D->{$oid_bin};
940                                 delete($D->{$oid_bin}) if $nr <= 0;
941                                 # nr < 0 (-1) means it never existed
942                                 next if $nr >= 0;
943                         }
944                         $stk->push_rec('m', $at, $ct, $oid, $cmt);
945                 }
946         }
947         close $fh or die "git log failed: \$?=$?";
948         $stk //= PublicInbox::IdxStack->new;
949         $stk->read_prepare;
950 }
951
952 sub prepare_stack ($$) {
953         my ($sync, $range) = @_;
954         my $git = $sync->{ibx}->git;
955
956         if (index($range, '..') < 0) {
957                 # don't show annoying git errors to users who run -index
958                 # on empty inboxes
959                 $git->qx(qw(rev-parse -q --verify), "$range^0");
960                 return PublicInbox::IdxStack->new->read_prepare if $?;
961         }
962         $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
963         log2stack($sync, $git, $range);
964 }
965
966 # --is-ancestor requires git 1.8.0+
967 sub is_ancestor ($$$) {
968         my ($git, $cur, $tip) = @_;
969         return 0 unless $git->check($cur);
970         my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
971                 qw(merge-base --is-ancestor), $cur, $tip ];
972         my $pid = spawn($cmd);
973         waitpid($pid, 0) == $pid or die join(' ', @$cmd) .' did not finish';
974         $? == 0;
975 }
976
977 sub need_update ($$$$) {
978         my ($self, $sync, $cur, $new) = @_;
979         my $git = $self->{ibx}->git;
980         $cur //= ''; # XS Search::Xapian ->get_metadata doesn't give undef
981
982         # don't rewind if --{since,until,before,after} are in use
983         return if $cur ne '' &&
984                 grep(defined, @{$sync->{-opt}}{qw(since until)}) &&
985                 is_ancestor($git, $new, $cur);
986
987         return 1 if $cur ne '' && !is_ancestor($git, $cur, $new);
988         my $range = $cur eq '' ? $new : "$cur..$new";
989         chomp(my $n = $git->qx(qw(rev-list --count), $range));
990         ($n eq '' || $n > 0);
991 }
992
993 # The last git commit we indexed with Xapian or SQLite (msgmap)
994 # This needs to account for cases where Xapian or SQLite is
995 # out-of-date with respect to the other.
996 sub _last_x_commit {
997         my ($self, $mm) = @_;
998         my $lm = $mm->last_commit || '';
999         my $lx = '';
1000         if (need_xapian($self)) {
1001                 $lx = $self->{xdb}->get_metadata('last_commit') || '';
1002         } else {
1003                 $lx = $lm;
1004         }
1005         # Use last_commit from msgmap if it is older or unset
1006         if (!$lm || ($lx && $lm && is_ancestor($self->{ibx}->git, $lm, $lx))) {
1007                 $lx = $lm;
1008         }
1009         $lx;
1010 }
1011
1012 sub reindex_from ($$) {
1013         my ($reindex, $last_commit) = @_;
1014         return $last_commit unless $reindex;
1015         ref($reindex) eq 'HASH' ? $reindex->{from} : '';
1016 }
1017
1018 sub quit_cb ($) {
1019         my ($sync) = @_;
1020         sub {
1021                 # we set {-opt}->{quit} too, so ->index_sync callers
1022                 # can abort multi-inbox loops this way
1023                 $sync->{quit} = $sync->{-opt}->{quit} = 1;
1024                 warn "gracefully quitting\n";
1025         }
1026 }
1027
1028 # indexes all unindexed messages (v1 only)
1029 sub _index_sync {
1030         my ($self, $opt) = @_;
1031         my $tip = $opt->{ref} || 'HEAD';
1032         my $ibx = $self->{ibx};
1033         local $self->{current_info} = "$ibx->{inboxdir}";
1034         $self->{batch_bytes} = $opt->{batch_size} // $BATCH_BYTES;
1035         $ibx->git->batch_prepare;
1036         my $pr = $opt->{-progress};
1037         my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
1038         my $quit = quit_cb($sync);
1039         local $SIG{QUIT} = $quit;
1040         local $SIG{INT} = $quit;
1041         local $SIG{TERM} = $quit;
1042         my $xdb = $self->begin_txn_lazy;
1043         $self->{oidx}->rethread_prepare($opt);
1044         my $mm = _msgmap_init($self);
1045         if ($sync->{reindex}) {
1046                 my $last = $mm->last_commit;
1047                 if ($last) {
1048                         $tip = $last;
1049                 } else {
1050                         # somebody just blindly added --reindex when indexing
1051                         # for the first time, allow it:
1052                         undef $sync->{reindex};
1053                 }
1054         }
1055         my $last_commit = _last_x_commit($self, $mm);
1056         my $lx = reindex_from($sync->{reindex}, $last_commit);
1057         my $range = $lx eq '' ? $tip : "$lx..$tip";
1058         $pr->("counting changes\n\t$range ... ") if $pr;
1059         my $stk = prepare_stack($sync, $range);
1060         $sync->{ntodo} = $stk ? $stk->num_records : 0;
1061         $pr->("$sync->{ntodo}\n") if $pr; # continue previous line
1062         process_stack($self, $sync, $stk) if !$sync->{quit};
1063 }
1064
1065 sub DESTROY {
1066         # order matters for unlocking
1067         $_[0]->{xdb} = undef;
1068         $_[0]->{lockfh} = undef;
1069 }
1070
1071 sub _begin_txn {
1072         my ($self) = @_;
1073         my $xdb = $self->{xdb} || idx_acquire($self);
1074         $self->{oidx}->begin_lazy if $self->{oidx};
1075         $xdb->begin_transaction if $xdb;
1076         $self->{txn} = 1;
1077         $xdb;
1078 }
1079
1080 sub begin_txn_lazy {
1081         my ($self) = @_;
1082         $self->with_umask(\&_begin_txn, $self) if !$self->{txn};
1083 }
1084
1085 # store 'indexlevel=medium' in v2 shard=0 and v1 (only one shard)
1086 # This metadata is read by Admin::detect_indexlevel:
1087 sub set_metadata_once {
1088         my ($self) = @_;
1089
1090         return if $self->{shard}; # only continue if undef or 0, not >0
1091         my $xdb = $self->{xdb};
1092
1093         if (delete($self->{-set_has_threadid_once})) {
1094                 $xdb->set_metadata('has_threadid', '1');
1095         }
1096         if (delete($self->{-set_indexlevel_once})) {
1097                 my $level = $xdb->get_metadata('indexlevel');
1098                 if (!$level || $level ne 'medium') {
1099                         $xdb->set_metadata('indexlevel', 'medium');
1100                 }
1101         }
1102         if (delete($self->{-set_skip_docdata_once})) {
1103                 $xdb->get_metadata('skip_docdata') or
1104                         $xdb->set_metadata('skip_docdata', '1');
1105         }
1106 }
1107
1108 sub _commit_txn {
1109         my ($self) = @_;
1110         if (my $eidx = $self->{eidx}) {
1111                 $eidx->git->async_wait_all;
1112                 $eidx->{transact_bytes} = 0;
1113         }
1114         if (my $xdb = $self->{xdb}) {
1115                 set_metadata_once($self);
1116                 $xdb->commit_transaction;
1117         }
1118         $self->{oidx}->commit_lazy if $self->{oidx};
1119 }
1120
1121 sub commit_txn_lazy {
1122         my ($self) = @_;
1123         delete($self->{txn}) and
1124                 $self->with_umask(\&_commit_txn, $self);
1125 }
1126
1127 sub eidx_shard_new {
1128         my ($class, $eidx, $shard) = @_;
1129         my $self = bless {
1130                 eidx => $eidx,
1131                 xpfx => $eidx->{xpfx},
1132                 indexlevel => $eidx->{indexlevel},
1133                 -skip_docdata => 1,
1134                 shard => $shard,
1135                 creat => 1,
1136         }, $class;
1137         $self->{-set_indexlevel_once} = 1 if $self->{indexlevel} eq 'medium';
1138         $self;
1139 }
1140
1141 1;