]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/SearchIdx.pm
search: disable batching in newer versions of Xapian, for now
[public-inbox.git] / lib / PublicInbox / SearchIdx.pm
1 # Copyright (C) 2015 all contributors <meta@public-inbox.org>
2 # License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt)
3 # based on notmuch, but with no concept of folders, files or flags
4 #
5 # Indexes mail with Xapian and our (SQLite-based) ::Msgmap for use
6 # with the web and NNTP interfaces.  This index maintains thread
7 # relationships for use by Mail::Thread.  This writes to the search
8 # index.
9 package PublicInbox::SearchIdx;
10 use strict;
11 use warnings;
12 use Fcntl qw(:flock :DEFAULT);
13 use Email::MIME;
14 use Email::MIME::ContentType;
15 $Email::MIME::ContentType::STRICT_PARAMS = 0;
16 use base qw(PublicInbox::Search);
17 use PublicInbox::MID qw/mid_clean id_compress mid_mime/;
18 use PublicInbox::MsgIter;
19 require PublicInbox::Git;
20 *xpfx = *PublicInbox::Search::xpfx;
21
22 use constant MAX_MID_SIZE => 244; # max term size - 1 in Xapian
23 use constant {
24         PERM_UMASK => 0,
25         OLD_PERM_GROUP => 1,
26         OLD_PERM_EVERYBODY => 2,
27         PERM_GROUP => 0660,
28         PERM_EVERYBODY => 0664,
29 };
30
31 # XXX temporary hack...
32 my $xap_ver = ((Search::Xapian::major_version << 16) |
33                  (Search::Xapian::minor_version << 8 ) |
34                   Search::Xapian::revision());
35 our $XAP_LOCK_BROKEN = $xap_ver >= 0x010216; # >= 1.2.22
36
37 sub new {
38         my ($class, $git_dir, $writable) = @_;
39         my $dir = PublicInbox::Search->xdir($git_dir);
40         require Search::Xapian::WritableDatabase;
41         my $flag = Search::Xapian::DB_OPEN;
42         my $self = bless { git_dir => $git_dir }, $class;
43         my $perm = $self->_git_config_perm;
44         my $umask = _umask_for($perm);
45         $self->{umask} = $umask;
46         $self->{lock_path} = "$git_dir/ssoma.lock";
47         $self->{xdb} = $self->with_umask(sub {
48                 if ($writable == 1) {
49                         require File::Path;
50                         File::Path::mkpath($dir);
51                         $self->{batch_size} = 100 unless $XAP_LOCK_BROKEN;
52                         $flag = Search::Xapian::DB_CREATE_OR_OPEN;
53                         _lock_acquire($self);
54                 }
55                 Search::Xapian::WritableDatabase->new($dir, $flag);
56         });
57         $self;
58 }
59
60 sub _xdb_release {
61         my ($self) = @_;
62         my $xdb = delete $self->{xdb};
63         $xdb->commit_transaction;
64         $xdb->close;
65 }
66
67 sub _xdb_acquire {
68         my ($self, $more) = @_;
69         my $dir = PublicInbox::Search->xdir($self->{git_dir});
70         my $flag = Search::Xapian::DB_OPEN;
71         my $xdb = Search::Xapian::WritableDatabase->new($dir, $flag);
72         $xdb->begin_transaction if $more;
73         $self->{xdb} = $xdb;
74 }
75
76 sub _lock_acquire {
77         my ($self) = @_;
78         sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or
79                 die "failed to open lock $self->{lock_path}: $!\n";
80         flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
81         $self->{lockfh} = $lockfh;
82 }
83
84 sub _lock_release {
85         my ($self) = @_;
86         my $lockfh = delete $self->{lockfh};
87         flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
88         close $lockfh or die "close failed: $!\n";
89 }
90
91 sub add_val {
92         my ($doc, $col, $num) = @_;
93         $num = Search::Xapian::sortable_serialise($num);
94         $doc->add_value($col, $num);
95 }
96
97 sub add_message {
98         my ($self, $mime, $bytes, $num, $blob) = @_; # mime = Email::MIME object
99         my $db = $self->{xdb};
100
101         my ($doc_id, $old_tid);
102         my $mid = mid_clean(mid_mime($mime));
103         my $ct_msg = $mime->header('Content-Type') || 'text/plain';
104
105         eval {
106                 die 'Message-ID too long' if length($mid) > MAX_MID_SIZE;
107                 my $smsg = $self->lookup_message($mid);
108                 if ($smsg) {
109                         # convert a ghost to a regular message
110                         # it will also clobber any existing regular message
111                         $doc_id = $smsg->doc_id;
112                         $old_tid = $smsg->thread_id;
113                 }
114                 $smsg = PublicInbox::SearchMsg->new($mime);
115                 my $doc = $smsg->{doc};
116                 $doc->add_term(xpfx('mid') . $mid);
117
118                 my $subj = $smsg->subject;
119                 if ($subj ne '') {
120                         my $path = $self->subject_path($subj);
121                         $doc->add_term(xpfx('path') . id_compress($path));
122                 }
123
124                 add_val($doc, &PublicInbox::Search::TS, $smsg->ts);
125
126                 defined($num) and
127                         add_val($doc, &PublicInbox::Search::NUM, $num);
128
129                 defined($bytes) and
130                         add_val($doc, &PublicInbox::Search::BYTES, $bytes);
131
132                 add_val($doc, &PublicInbox::Search::LINES,
133                                 $mime->body_raw =~ tr!\n!\n!);
134
135                 my $tg = $self->term_generator;
136
137                 $tg->set_document($doc);
138                 $tg->index_text($subj, 1, 'S') if $subj;
139                 $tg->increase_termpos;
140                 $tg->index_text($subj) if $subj;
141                 $tg->increase_termpos;
142
143                 $tg->index_text($smsg->from);
144                 $tg->increase_termpos;
145
146                 msg_iter($mime, sub {
147                         my ($part, $depth, @idx) = @{$_[0]};
148                         my $ct = $part->content_type || $ct_msg;
149
150                         # account for filter bugs...
151                         $ct =~ m!\btext/plain\b!i or return;
152
153                         my (@orig, @quot);
154                         my $body = $part->body;
155                         $part->body_set('');
156                         my @lines = split(/\n/, $body);
157                         while (defined(my $l = shift @lines)) {
158                                 if ($l =~ /^\s*>/) {
159                                         push @quot, $l;
160                                 } else {
161                                         push @orig, $l;
162                                 }
163                         }
164                         if (@quot) {
165                                 $tg->index_text(join("\n", @quot), 0);
166                                 @quot = ();
167                                 $tg->increase_termpos;
168                         }
169                         if (@orig) {
170                                 $tg->index_text(join("\n", @orig));
171                                 @orig = ();
172                                 $tg->increase_termpos;
173                         }
174                 });
175
176                 link_message($self, $smsg, $old_tid);
177                 $doc->set_data($smsg->to_doc_data($blob));
178                 if (defined $doc_id) {
179                         $db->replace_document($doc_id, $doc);
180                 } else {
181                         $doc_id = $db->add_document($doc);
182                 }
183         };
184
185         if ($@) {
186                 warn "failed to index message <$mid>: $@\n";
187                 return undef;
188         }
189         $doc_id;
190 }
191
192 # returns deleted doc_id on success, undef on missing
193 sub remove_message {
194         my ($self, $mid) = @_;
195         my $db = $self->{xdb};
196         my $doc_id;
197         $mid = mid_clean($mid);
198
199         eval {
200                 $doc_id = $self->find_unique_doc_id('mid', $mid);
201                 $db->delete_document($doc_id) if defined $doc_id;
202         };
203
204         if ($@) {
205                 warn "failed to remove message <$mid>: $@\n";
206                 return undef;
207         }
208         $doc_id;
209 }
210
211 sub term_generator { # write-only
212         my ($self) = @_;
213
214         my $tg = $self->{term_generator};
215         return $tg if $tg;
216
217         $tg = Search::Xapian::TermGenerator->new;
218         $tg->set_stemmer($self->stemmer);
219
220         $self->{term_generator} = $tg;
221 }
222
223 # increments last_thread_id counter
224 # returns a 64-bit integer represented as a hex string
225 sub next_thread_id {
226         my ($self) = @_;
227         my $db = $self->{xdb};
228         my $last_thread_id = int($db->get_metadata('last_thread_id') || 0);
229
230         $db->set_metadata('last_thread_id', ++$last_thread_id);
231
232         $last_thread_id;
233 }
234
235 sub link_message {
236         my ($self, $smsg, $old_tid) = @_;
237         my $doc = $smsg->{doc};
238         my $mid = $smsg->mid;
239         my $mime = $smsg->mime;
240         my $hdr = $mime->header_obj;
241         my $refs = $hdr->header_raw('References');
242         my @refs = $refs ? ($refs =~ /<([^>]+)>/g) : ();
243         if (my $irt = $hdr->header_raw('In-Reply-To')) {
244                 # last References should be $irt
245                 # we will de-dupe later
246                 push @refs, mid_clean($irt);
247         }
248
249         my $tid;
250         if (@refs) {
251                 my %uniq = ($mid => 1);
252                 my @orig_refs = @refs;
253                 @refs = ();
254
255                 # prevent circular references via References: here:
256                 foreach my $ref (@orig_refs) {
257                         if (length($ref) > MAX_MID_SIZE) {
258                                 warn "References: <$ref> too long, ignoring\n";
259                         }
260                         next if $uniq{$ref};
261                         $uniq{$ref} = 1;
262                         push @refs, $ref;
263                 }
264         }
265         if (@refs) {
266                 $smsg->{references} = '<'.join('> <', @refs).'>';
267
268                 # first ref *should* be the thread root,
269                 # but we can never trust clients to do the right thing
270                 my $ref = shift @refs;
271                 $tid = $self->_resolve_mid_to_tid($ref);
272                 $self->merge_threads($tid, $old_tid) if defined $old_tid;
273
274                 # the rest of the refs should point to this tid:
275                 foreach $ref (@refs) {
276                         my $ptid = $self->_resolve_mid_to_tid($ref);
277                         merge_threads($self, $tid, $ptid);
278                 }
279         } else {
280                 $tid = $self->next_thread_id;
281         }
282         $doc->add_term(xpfx('thread') . $tid);
283 }
284
285 sub index_blob {
286         my ($self, $git, $mime, $bytes, $num, $blob) = @_;
287         $self->add_message($mime, $bytes, $num, $blob);
288 }
289
290 sub unindex_blob {
291         my ($self, $git, $mime) = @_;
292         my $mid = eval { mid_clean(mid_mime($mime)) };
293         $self->remove_message($mid) if defined $mid;
294 }
295
296 sub index_mm {
297         my ($self, $git, $mime) = @_;
298         $self->{mm}->mid_insert(mid_clean(mid_mime($mime)));
299 }
300
301 sub unindex_mm {
302         my ($self, $git, $mime) = @_;
303         $self->{mm}->mid_delete(mid_clean(mid_mime($mime)));
304 }
305
306 sub index_mm2 {
307         my ($self, $git, $mime, $bytes, $blob) = @_;
308         my $num = $self->{mm}->num_for(mid_clean(mid_mime($mime)));
309         index_blob($self, $git, $mime, $bytes, $num, $blob);
310 }
311
312 sub unindex_mm2 {
313         my ($self, $git, $mime) = @_;
314         $self->{mm}->mid_delete(mid_clean(mid_mime($mime)));
315         unindex_blob($self, $git, $mime);
316 }
317
318 sub index_both {
319         my ($self, $git, $mime, $bytes, $blob) = @_;
320         my $num = index_mm($self, $git, $mime);
321         index_blob($self, $git, $mime, $bytes, $num, $blob);
322 }
323
324 sub unindex_both {
325         my ($self, $git, $mime) = @_;
326         unindex_blob($self, $git, $mime);
327         unindex_mm($self, $git, $mime);
328 }
329
330 sub do_cat_mail {
331         my ($git, $blob, $sizeref) = @_;
332         my $mime = eval {
333                 my $str = $git->cat_file($blob, $sizeref);
334                 # fixup bugs from import:
335                 $$str =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
336                 Email::MIME->new($str);
337         };
338         $@ ? undef : $mime;
339 }
340
341 sub index_sync {
342         my ($self, $opts) = @_;
343         with_umask($self, sub { $self->_index_sync($opts) });
344 }
345
346 sub rlog {
347         my ($self, $range, $add_cb, $del_cb, $batch_cb) = @_;
348         my $hex = '[a-f0-9]';
349         my $h40 = $hex .'{40}';
350         my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
351         my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
352         my $git = PublicInbox::Git->new($self->{git_dir});
353         my $log = $git->popen(qw/log --reverse --no-notes --no-color
354                                 --raw -r --no-abbrev/, $range);
355         my $latest;
356         my $bytes;
357         my $max = $self->{batch_size}; # may be undef
358         local $/ = "\n";
359         my $line;
360         while (defined($line = <$log>)) {
361                 if ($line =~ /$addmsg/o) {
362                         my $blob = $1;
363                         my $mime = do_cat_mail($git, $blob, \$bytes) or next;
364                         $add_cb->($self, $git, $mime, $bytes, $blob);
365                 } elsif ($line =~ /$delmsg/o) {
366                         my $blob = $1;
367                         my $mime = do_cat_mail($git, $blob) or next;
368                         $del_cb->($self, $git, $mime);
369                 } elsif ($line =~ /^commit ($h40)/o) {
370                         if (defined $max && --$max <= 0) {
371                                 $max = $self->{batch_size};
372                                 $batch_cb->($latest, 1);
373                         }
374                         $latest = $1;
375                 }
376         }
377         $batch_cb->($latest, 0);
378 }
379
380 # indexes all unindexed messages
381 sub _index_sync {
382         my ($self, $opts) = @_;
383         my $tip = $opts->{ref} || 'HEAD';
384         my $mm = $self->{mm} = eval {
385                 require PublicInbox::Msgmap;
386                 PublicInbox::Msgmap->new($self->{git_dir}, 1);
387         };
388         my $xdb = $self->{xdb};
389         $xdb->begin_transaction;
390         my $reindex = $opts->{reindex};
391         my $mkey = 'last_commit';
392         my $last_commit = $xdb->get_metadata($mkey);
393         my $lx = $last_commit;
394         if ($reindex) {
395                 $lx = '';
396                 $mkey = undef if $last_commit ne '';
397         }
398         my $dbh;
399         my $cb = sub {
400                 my ($commit, $more) = @_;
401                 $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
402                 if ($dbh) {
403                         $mm->last_commit($commit) if $commit;
404                         $dbh->commit;
405                 }
406                 if ($XAP_LOCK_BROKEN) {
407                         $xdb->commit_transaction if !$more;
408                 } else {
409                         $xdb = undef;
410                         _xdb_release($self);
411                         _lock_release($self);
412                 }
413                 # let another process do some work...
414                 if (!$XAP_LOCK_BROKEN) {
415                         _lock_acquire($self);
416                         $dbh->begin_work if $dbh && $more;
417                         $xdb = _xdb_acquire($self, $more);
418                 }
419         };
420
421         my $range = $lx eq '' ? $tip : "$lx..$tip";
422         if ($mm) {
423                 $dbh = $mm->{dbh};
424                 $dbh->begin_work;
425                 my $lm = $mm->last_commit || '';
426                 if ($lm eq $lx) {
427                         # Common case is the indexes are synced,
428                         # we only need to run git-log once:
429                         rlog($self, $range, *index_both, *unindex_both, $cb);
430                 } else {
431                         # Uncommon case, msgmap and xapian are out-of-sync
432                         # do not care for performance (but git is fast :>)
433                         # This happens if we have to reindex Xapian since
434                         # msgmap is a frozen format and our Xapian format
435                         # is evolving.
436                         my $r = $lm eq '' ? $tip : "$lm..$tip";
437
438                         # first, ensure msgmap is up-to-date:
439                         my $mkey_prev = $mkey;
440                         $mkey = undef; # ignore xapian, for now
441                         rlog($self, $r, *index_mm, *unindex_mm, $cb);
442
443                         # now deal with Xapian
444                         $mkey = $mkey_prev;
445                         $dbh = undef;
446                         rlog($self, $range, *index_mm2, *unindex_mm2, $cb);
447                 }
448         } else {
449                 # user didn't install DBD::SQLite and DBI
450                 rlog($self, $range, *index_blob, *unindex_blob, $cb);
451         }
452 }
453
454 # this will create a ghost as necessary
455 sub _resolve_mid_to_tid {
456         my ($self, $mid) = @_;
457
458         my $smsg = $self->lookup_message($mid) || $self->create_ghost($mid);
459         $smsg->thread_id;
460 }
461
462 sub create_ghost {
463         my ($self, $mid) = @_;
464
465         my $tid = $self->next_thread_id;
466         my $doc = Search::Xapian::Document->new;
467         $doc->add_term(xpfx('mid') . $mid);
468         $doc->add_term(xpfx('thread') . $tid);
469         $doc->add_term(xpfx('type') . 'ghost');
470
471         my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
472         $self->{xdb}->add_document($doc);
473
474         $smsg;
475 }
476
477 sub merge_threads {
478         my ($self, $winner_tid, $loser_tid) = @_;
479         return if $winner_tid == $loser_tid;
480         my ($head, $tail) = $self->find_doc_ids('thread', $loser_tid);
481         my $thread_pfx = xpfx('thread');
482         my $db = $self->{xdb};
483
484         for (; $head != $tail; $head->inc) {
485                 my $docid = $head->get_docid;
486                 my $doc = $db->get_document($docid);
487                 $doc->remove_term($thread_pfx . $loser_tid);
488                 $doc->add_term($thread_pfx . $winner_tid);
489                 $db->replace_document($docid, $doc);
490         }
491 }
492
493 sub _read_git_config_perm {
494         my ($self) = @_;
495         my @cmd = qw(config core.sharedRepository);
496         my $fh = PublicInbox::Git->new($self->{git_dir})->popen(@cmd);
497         local $/ = "\n";
498         my $perm = <$fh>;
499         chomp $perm if defined $perm;
500         $perm;
501 }
502
503 sub _git_config_perm {
504         my $self = shift;
505         my $perm = scalar @_ ? $_[0] : _read_git_config_perm($self);
506         return PERM_GROUP if (!defined($perm) || $perm eq '');
507         return PERM_UMASK if ($perm eq 'umask');
508         return PERM_GROUP if ($perm eq 'group');
509         if ($perm =~ /\A(?:all|world|everybody)\z/) {
510                 return PERM_EVERYBODY;
511         }
512         return PERM_GROUP if ($perm =~ /\A(?:true|yes|on|1)\z/);
513         return PERM_UMASK if ($perm =~ /\A(?:false|no|off|0)\z/);
514
515         my $i = oct($perm);
516         return PERM_UMASK if ($i == PERM_UMASK);
517         return PERM_GROUP if ($i == OLD_PERM_GROUP);
518         return PERM_EVERYBODY if ($i == OLD_PERM_EVERYBODY);
519
520         if (($i & 0600) != 0600) {
521                 die "core.sharedRepository mode invalid: ".
522                     sprintf('%.3o', $i) . "\nOwner must have permissions\n";
523         }
524         ($i & 0666);
525 }
526
527 sub _umask_for {
528         my ($perm) = @_; # _git_config_perm return value
529         my $rv = $perm;
530         return umask if $rv == 0;
531
532         # set +x bit if +r or +w were set
533         $rv |= 0100 if ($rv & 0600);
534         $rv |= 0010 if ($rv & 0060);
535         $rv |= 0001 if ($rv & 0006);
536         (~$rv & 0777);
537 }
538
539 sub with_umask {
540         my ($self, $cb) = @_;
541         my $old = umask $self->{umask};
542         my $rv = eval { $cb->() };
543         my $err = $@;
544         umask $old;
545         die $err if $@;
546         $rv;
547 }
548
549 sub DESTROY {
550         # order matters for unlocking
551         $_[0]->{xdb} = undef;
552         $_[0]->{lockfh} = undef;
553 }
554
555 1;