]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/SearchIdx.pm
www: favor reading more from SQLite, and less from Xapian
[public-inbox.git] / lib / PublicInbox / SearchIdx.pm
1 # Copyright (C) 2015-2018 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 # based on notmuch, but with no concept of folders, files or flags
4 #
5 # Indexes mail with Xapian and our (SQLite-based) ::Msgmap for use
6 # with the web and NNTP interfaces.  This index maintains thread
7 # relationships for use by PublicInbox::SearchThread.
8 # This writes to the search index.
9 package PublicInbox::SearchIdx;
10 use strict;
11 use warnings;
12 use base qw(PublicInbox::Search PublicInbox::Lock);
13 use PublicInbox::MIME;
14 use PublicInbox::InboxWritable;
15 use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/;
16 use PublicInbox::MsgIter;
17 use Carp qw(croak);
18 use POSIX qw(strftime);
19 use PublicInbox::OverIdx;
20 require PublicInbox::Git;
21 use Compress::Zlib qw(compress);
22
23 use constant {
24         BATCH_BYTES => 10_000_000,
25         DEBUG => !!$ENV{DEBUG},
26 };
27
28 my %GIT_ESC = (
29         a => "\a",
30         b => "\b",
31         f => "\f",
32         n => "\n",
33         r => "\r",
34         t => "\t",
35         v => "\013",
36 );
37
38 sub git_unquote ($) {
39         my ($s) = @_;
40         return $s unless ($s =~ /\A"(.*)"\z/);
41         $s = $1;
42         $s =~ s/\\([abfnrtv])/$GIT_ESC{$1}/g;
43         $s =~ s/\\([0-7]{1,3})/chr(oct($1))/ge;
44         $s;
45 }
46
47 sub new {
48         my ($class, $ibx, $creat, $part) = @_;
49         my $mainrepo = $ibx; # for "public-inbox-index" w/o entry in config
50         my $git_dir = $mainrepo;
51         my ($altid, $git);
52         my $version = 1;
53         if (ref $ibx) {
54                 $mainrepo = $ibx->{mainrepo};
55                 $altid = $ibx->{altid};
56                 $version = $ibx->{version} || 1;
57                 if ($altid) {
58                         require PublicInbox::AltId;
59                         $altid = [ map {
60                                 PublicInbox::AltId->new($ibx, $_);
61                         } @$altid ];
62                 }
63         } else { # v1
64                 $ibx = { mainrepo => $git_dir, version => 1 };
65         }
66         $ibx = PublicInbox::InboxWritable->new($ibx);
67         require Search::Xapian::WritableDatabase;
68         my $self = bless {
69                 mainrepo => $mainrepo,
70                 -inbox => $ibx,
71                 git => $ibx->git,
72                 -altid => $altid,
73                 version => $version,
74         }, $class;
75         $ibx->umask_prepare;
76         if ($version == 1) {
77                 $self->{lock_path} = "$mainrepo/ssoma.lock";
78                 my $dir = $self->xdir;
79                 $self->{over_ro} = $self->{over} =
80                                 PublicInbox::OverIdx->new("$dir/over.sqlite3");
81         } elsif ($version == 2) {
82                 defined $part or die "partition is required for v2\n";
83                 # partition is a number
84                 $self->{partition} = $part;
85                 $self->{lock_path} = undef;
86         } else {
87                 die "unsupported inbox version=$version\n";
88         }
89         $self->{creat} = ($creat || 0) == 1;
90         $self;
91 }
92
93 sub _xdb_release {
94         my ($self) = @_;
95         my $xdb = delete $self->{xdb} or croak 'not acquired';
96         $xdb->close;
97         $self->lock_release if $self->{creat};
98         undef;
99 }
100
101 sub _xdb_acquire {
102         my ($self) = @_;
103         croak 'already acquired' if $self->{xdb};
104         my $dir = $self->xdir;
105         my $flag = Search::Xapian::DB_OPEN;
106         if ($self->{creat}) {
107                 require File::Path;
108                 $self->lock_acquire;
109                 File::Path::mkpath($dir);
110                 $flag = Search::Xapian::DB_CREATE_OR_OPEN;
111         }
112         $self->{xdb} = Search::Xapian::WritableDatabase->new($dir, $flag);
113 }
114
115 sub add_val ($$$) {
116         my ($doc, $col, $num) = @_;
117         $num = Search::Xapian::sortable_serialise($num);
118         $doc->add_value($col, $num);
119 }
120
121 sub index_users ($$) {
122         my ($tg, $smsg) = @_;
123
124         my $from = $smsg->from;
125         my $to = $smsg->to;
126         my $cc = $smsg->cc;
127
128         $tg->index_text($from, 1, 'A'); # A - author
129         $tg->increase_termpos;
130         $tg->index_text($to, 1, 'XTO') if $to ne '';
131         $tg->increase_termpos;
132         $tg->index_text($cc, 1, 'XCC') if $cc ne '';
133         $tg->increase_termpos;
134 }
135
136 sub index_diff_inc ($$$$) {
137         my ($tg, $text, $pfx, $xnq) = @_;
138         if (@$xnq) {
139                 $tg->index_text(join("\n", @$xnq), 1, 'XNQ');
140                 $tg->increase_termpos;
141                 @$xnq = ();
142         }
143         $tg->index_text($text, 1, $pfx);
144         $tg->increase_termpos;
145 }
146
147 sub index_old_diff_fn {
148         my ($tg, $seen, $fa, $fb, $xnq) = @_;
149
150         # no renames or space support for traditional diffs,
151         # find the number of leading common paths to strip:
152         my @fa = split('/', $fa);
153         my @fb = split('/', $fb);
154         while (scalar(@fa) && scalar(@fb)) {
155                 $fa = join('/', @fa);
156                 $fb = join('/', @fb);
157                 if ($fa eq $fb) {
158                         unless ($seen->{$fa}++) {
159                                 index_diff_inc($tg, $fa, 'XDFN', $xnq);
160                         }
161                         return 1;
162                 }
163                 shift @fa;
164                 shift @fb;
165         }
166         0;
167 }
168
169 sub index_diff ($$$) {
170         my ($tg, $lines, $doc) = @_;
171         my %seen;
172         my $in_diff;
173         my @xnq;
174         my $xnq = \@xnq;
175         foreach (@$lines) {
176                 if ($in_diff && s/^ //) { # diff context
177                         index_diff_inc($tg, $_, 'XDFCTX', $xnq);
178                 } elsif (/^-- $/) { # email signature begins
179                         $in_diff = undef;
180                 } elsif (m!^diff --git ("?a/.+) ("?b/.+)\z!) {
181                         my ($fa, $fb) = ($1, $2);
182                         my $fn = (split('/', git_unquote($fa), 2))[1];
183                         $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq);
184                         $fn = (split('/', git_unquote($fb), 2))[1];
185                         $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq);
186                         $in_diff = 1;
187                 # traditional diff:
188                 } elsif (m/^diff -(.+) (\S+) (\S+)$/) {
189                         my ($opt, $fa, $fb) = ($1, $2, $3);
190                         push @xnq, $_;
191                         # only support unified:
192                         next unless $opt =~ /[uU]/;
193                         $in_diff = index_old_diff_fn($tg, \%seen, $fa, $fb,
194                                                         $xnq);
195                 } elsif (m!^--- ("?a/.+)!) {
196                         my $fn = (split('/', git_unquote($1), 2))[1];
197                         $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq);
198                         $in_diff = 1;
199                 } elsif (m!^\+\+\+ ("?b/.+)!)  {
200                         my $fn = (split('/', git_unquote($1), 2))[1];
201                         $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq);
202                         $in_diff = 1;
203                 } elsif (/^--- (\S+)/) {
204                         $in_diff = $1;
205                         push @xnq, $_;
206                 } elsif (defined $in_diff && /^\+\+\+ (\S+)/) {
207                         $in_diff = index_old_diff_fn($tg, \%seen, $in_diff, $1,
208                                                         $xnq);
209                 } elsif ($in_diff && s/^\+//) { # diff added
210                         index_diff_inc($tg, $_, 'XDFB', $xnq);
211                 } elsif ($in_diff && s/^-//) { # diff removed
212                         index_diff_inc($tg, $_, 'XDFA', $xnq);
213                 } elsif (m!^index ([a-f0-9]+)\.\.([a-f0-9]+)!) {
214                         my ($ba, $bb) = ($1, $2);
215                         index_git_blob_id($doc, 'XDFPRE', $ba);
216                         index_git_blob_id($doc, 'XDFPOST', $bb);
217                         $in_diff = 1;
218                 } elsif (/^@@ (?:\S+) (?:\S+) @@\s*$/) {
219                         # traditional diff w/o -p
220                 } elsif (/^@@ (?:\S+) (?:\S+) @@\s*(\S+.*)$/) {
221                         # hunk header context
222                         index_diff_inc($tg, $1, 'XDFHH', $xnq);
223                 # ignore the following lines:
224                 } elsif (/^(?:dis)similarity index/ ||
225                                 /^(?:old|new) mode/ ||
226                                 /^(?:deleted|new) file mode/ ||
227                                 /^(?:copy|rename) (?:from|to) / ||
228                                 /^(?:dis)?similarity index / ||
229                                 /^\\ No newline at end of file/ ||
230                                 /^Binary files .* differ/) {
231                         push @xnq, $_;
232                 } elsif ($_ eq '') {
233                         $in_diff = undef;
234                 } else {
235                         push @xnq, $_;
236                         warn "non-diff line: $_\n" if DEBUG && $_ ne '';
237                         $in_diff = undef;
238                 }
239         }
240
241         $tg->index_text(join("\n", @xnq), 1, 'XNQ');
242         $tg->increase_termpos;
243 }
244
245 sub index_body ($$$) {
246         my ($tg, $lines, $doc) = @_;
247         my $txt = join("\n", @$lines);
248         if ($doc) {
249                 # does it look like a diff?
250                 if ($txt =~ /^(?:diff|---|\+\+\+) /ms) {
251                         $txt = undef;
252                         index_diff($tg, $lines, $doc);
253                 } else {
254                         $tg->index_text($txt, 1, 'XNQ');
255                 }
256         } else {
257                 $tg->index_text($txt, 0, 'XQUOT');
258         }
259         $tg->increase_termpos;
260         @$lines = ();
261 }
262
263 sub add_message {
264         # mime = Email::MIME object
265         my ($self, $mime, $bytes, $num, $oid, $mid0) = @_;
266         my $doc_id;
267         my $mids = mids($mime->header_obj);
268         $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
269         unless (defined $num) { # v1
270                 my $mm = $self->_msgmap_init;
271                 $num = $mm->mid_insert($mid0) || $mm->num_for($mid0);
272         }
273         eval {
274                 my $smsg = PublicInbox::SearchMsg->new($mime);
275                 my $doc = $smsg->{doc};
276                 my $subj = $smsg->subject;
277                 my $xpath;
278                 if ($subj ne '') {
279                         $xpath = $self->subject_path($subj);
280                         $xpath = id_compress($xpath);
281                 }
282
283                 $smsg->{lines} = $mime->body_raw =~ tr!\n!\n!;
284                 defined $bytes or $bytes = length($mime->as_string);
285                 $smsg->{bytes} = $bytes;
286
287                 add_val($doc, PublicInbox::Search::TS(), $smsg->ts);
288                 my @ds = gmtime($smsg->ds);
289                 my $yyyymmdd = strftime('%Y%m%d', @ds);
290                 add_val($doc, PublicInbox::Search::YYYYMMDD(), $yyyymmdd);
291                 my $dt = strftime('%Y%m%d%H%M%S', @ds);
292                 add_val($doc, PublicInbox::Search::DT(), $dt);
293                 my @vals = ($smsg->{ts}, $smsg->{ds});
294
295                 my $tg = $self->term_generator;
296
297                 $tg->set_document($doc);
298                 $tg->index_text($subj, 1, 'S') if $subj;
299                 $tg->increase_termpos;
300
301                 index_users($tg, $smsg);
302
303                 msg_iter($mime, sub {
304                         my ($part, $depth, @idx) = @{$_[0]};
305                         my $ct = $part->content_type || 'text/plain';
306                         my $fn = $part->filename;
307                         if (defined $fn && $fn ne '') {
308                                 $tg->index_text($fn, 1, 'XFN');
309                         }
310
311                         return if $ct =~ m!\btext/x?html\b!i;
312
313                         my $s = eval { $part->body_str };
314                         if ($@) {
315                                 if ($ct =~ m!\btext/plain\b!i) {
316                                         # Try to assume UTF-8 because Alpine
317                                         # seems to do wacky things and set
318                                         # charset=X-UNKNOWN
319                                         $part->charset_set('UTF-8');
320                                         $s = eval { $part->body_str };
321                                         $s = $part->body if $@;
322                                 }
323                         }
324                         defined $s or return;
325
326                         my (@orig, @quot);
327                         my $body = $part->body;
328                         my @lines = split(/\n/, $body);
329                         while (defined(my $l = shift @lines)) {
330                                 if ($l =~ /^>/) {
331                                         index_body($tg, \@orig, $doc) if @orig;
332                                         push @quot, $l;
333                                 } else {
334                                         index_body($tg, \@quot, 0) if @quot;
335                                         push @orig, $l;
336                                 }
337                         }
338                         index_body($tg, \@quot, 0) if @quot;
339                         index_body($tg, \@orig, $doc) if @orig;
340                 });
341
342                 # populates smsg->references for smsg->to_doc_data
343                 my $refs = parse_references($smsg, $mid0, $mids);
344                 my $data = $smsg->to_doc_data($oid, $mid0);
345                 foreach my $mid (@$mids) {
346                         $tg->index_text($mid, 1, 'XM');
347                 }
348                 $doc->set_data($data);
349                 if (my $altid = $self->{-altid}) {
350                         foreach my $alt (@$altid) {
351                                 my $pfx = $alt->{xprefix};
352                                 foreach my $mid (@$mids) {
353                                         my $id = $alt->mid2alt($mid);
354                                         next unless defined $id;
355                                         $doc->add_boolean_term($pfx . $id);
356                                 }
357                         }
358                 }
359
360                 $self->delete_article($num) if defined $num; # for reindexing
361
362                 utf8::encode($data);
363                 $data = compress($data);
364                 push @vals, $num, $mids, $refs, $xpath, $data;
365                 $self->{over}->add_over(\@vals);
366                 $doc->add_boolean_term('Q' . $_) foreach @$mids;
367                 $doc->add_boolean_term('XNUM' . $num) if defined $num;
368                 $doc_id = $self->{xdb}->add_document($doc);
369         };
370
371         if ($@) {
372                 warn "failed to index message <".join('> <',@$mids).">: $@\n";
373                 return undef;
374         }
375         $doc_id;
376 }
377
378 # returns begin and end PostingIterator
379 sub find_doc_ids {
380         my ($self, $termval) = @_;
381         my $db = $self->{xdb};
382
383         ($db->postlist_begin($termval), $db->postlist_end($termval));
384 }
385
386 sub batch_do {
387         my ($self, $termval, $cb) = @_;
388         my $batch_size = 1000; # don't let @ids grow too large to avoid OOM
389         while (1) {
390                 my ($head, $tail) = $self->find_doc_ids($termval);
391                 return if $head == $tail;
392                 my @ids;
393                 for (; $head != $tail && @ids < $batch_size; $head->inc) {
394                         push @ids, $head->get_docid;
395                 }
396                 $cb->(\@ids);
397         }
398 }
399
400 sub remove_message {
401         my ($self, $mid) = @_;
402         my $db = $self->{xdb};
403         my $called;
404         $mid = mid_clean($mid);
405
406         eval {
407                 batch_do($self, 'Q' . $mid, sub {
408                         my ($ids) = @_;
409                         $db->delete_document($_) for @$ids;
410                         $called = 1;
411                 });
412         };
413         if ($@) {
414                 warn "failed to remove message <$mid>: $@\n";
415         } elsif (!$called) {
416                 warn "cannot remove non-existent <$mid>\n";
417         }
418 }
419
420 sub delete_article {
421         my ($self, $num) = @_;
422         my $ndel = 0;
423         batch_do($self, 'XNUM' . $num, sub {
424                 my ($ids) = @_;
425                 $ndel += scalar @$ids;
426                 $self->{xdb}->delete_document($_) for @$ids;
427         });
428 }
429
430 # MID is a hint in V2
431 sub remove_by_oid {
432         my ($self, $oid, $mid) = @_;
433         my $db = $self->{xdb};
434
435         # XXX careful, we cannot use batch_do here since we conditionally
436         # delete documents based on other factors, so we cannot call
437         # find_doc_ids twice.
438         my ($head, $tail) = $self->find_doc_ids('Q' . $mid);
439         return if $head == $tail;
440
441         # there is only ONE element in @delete unless we
442         # have bugs in our v2writable deduplication check
443         my @delete;
444         my @over_del;
445         for (; $head != $tail; $head->inc) {
446                 my $docid = $head->get_docid;
447                 my $doc = $db->get_document($docid);
448                 my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
449                 $smsg->load_expand;
450                 if ($smsg->{blob} eq $oid) {
451                         push(@delete, $docid);
452                         push(@over_del, $smsg->num);
453                 }
454         }
455         $db->delete_document($_) foreach @delete;
456         $self->{over}->remove_oid($oid, $mid);
457         scalar(@delete);
458 }
459
460 sub term_generator { # write-only
461         my ($self) = @_;
462
463         my $tg = $self->{term_generator};
464         return $tg if $tg;
465
466         $tg = Search::Xapian::TermGenerator->new;
467         $tg->set_stemmer($self->stemmer);
468
469         $self->{term_generator} = $tg;
470 }
471
472 sub parse_references ($$$) {
473         my ($smsg, $mid0, $mids) = @_;
474         my $mime = $smsg->{mime};
475         my $hdr = $mime->header_obj;
476         my $refs = references($hdr);
477         push(@$refs, @$mids) if scalar(@$mids) > 1;
478         return $refs if scalar(@$refs) == 0;
479
480         # prevent circular references here:
481         my %seen = ( $mid0 => 1 );
482         my @keep;
483         foreach my $ref (@$refs) {
484                 if (length($ref) > PublicInbox::MID::MAX_MID_SIZE) {
485                         warn "References: <$ref> too long, ignoring\n";
486                         next;
487                 }
488                 next if $seen{$ref}++;
489                 push @keep, $ref;
490         }
491         $smsg->{references} = '<'.join('> <', @keep).'>' if @keep;
492         \@keep;
493 }
494
495 sub index_git_blob_id {
496         my ($doc, $pfx, $objid) = @_;
497
498         my $len = length($objid);
499         for (my $len = length($objid); $len >= 7; ) {
500                 $doc->add_term($pfx.$objid);
501                 $objid = substr($objid, 0, --$len);
502         }
503 }
504
505 sub unindex_blob {
506         my ($self, $mime) = @_;
507         my $mid = eval { mid_clean(mid_mime($mime)) };
508         $self->remove_message($mid) if defined $mid;
509 }
510
511 sub index_mm {
512         my ($self, $mime) = @_;
513         my $mid = mid_clean(mid_mime($mime));
514         my $mm = $self->{mm};
515         my $num = $mm->mid_insert($mid);
516         return $num if defined $num;
517
518         # fallback to num_for since filters like RubyLang set the number
519         $mm->num_for($mid);
520 }
521
522 sub unindex_mm {
523         my ($self, $mime) = @_;
524         $self->{mm}->mid_delete(mid_clean(mid_mime($mime)));
525 }
526
527 sub index_mm2 {
528         my ($self, $mime, $bytes, $blob) = @_;
529         my $num = $self->{mm}->num_for(mid_clean(mid_mime($mime)));
530         add_message($self, $mime, $bytes, $num, $blob);
531 }
532
533 sub unindex_mm2 {
534         my ($self, $mime) = @_;
535         $self->{mm}->mid_delete(mid_clean(mid_mime($mime)));
536         unindex_blob($self, $mime);
537 }
538
539 sub index_both {
540         my ($self, $mime, $bytes, $blob) = @_;
541         my $num = index_mm($self, $mime);
542         add_message($self, $mime, $bytes, $num, $blob);
543 }
544
545 sub unindex_both {
546         my ($self, $mime) = @_;
547         unindex_blob($self, $mime);
548         unindex_mm($self, $mime);
549 }
550
551 sub do_cat_mail {
552         my ($git, $blob, $sizeref) = @_;
553         my $mime = eval {
554                 my $str = $git->cat_file($blob, $sizeref);
555                 # fixup bugs from import:
556                 $$str =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
557                 PublicInbox::MIME->new($str);
558         };
559         $@ ? undef : $mime;
560 }
561
562 sub index_sync {
563         my ($self, $opts) = @_;
564         $self->{-inbox}->with_umask(sub { $self->_index_sync($opts) })
565 }
566
567 sub batch_adjust ($$$$) {
568         my ($max, $bytes, $batch_cb, $latest) = @_;
569         $$max -= $bytes;
570         if ($$max <= 0) {
571                 $$max = BATCH_BYTES;
572                 $batch_cb->($latest, 1);
573         }
574 }
575
576 # only for v1
577 sub rlog {
578         my ($self, $log, $add_cb, $del_cb, $batch_cb) = @_;
579         my $hex = '[a-f0-9]';
580         my $h40 = $hex .'{40}';
581         my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
582         my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
583         my $git = $self->{git};
584         my $latest;
585         my $bytes;
586         my $max = BATCH_BYTES;
587         local $/ = "\n";
588         my $line;
589         while (defined($line = <$log>)) {
590                 if ($line =~ /$addmsg/o) {
591                         my $blob = $1;
592                         my $mime = do_cat_mail($git, $blob, \$bytes) or next;
593                         batch_adjust(\$max, $bytes, $batch_cb, $latest);
594                         $add_cb->($self, $mime, $bytes, $blob);
595                 } elsif ($line =~ /$delmsg/o) {
596                         my $blob = $1;
597                         my $mime = do_cat_mail($git, $blob, \$bytes) or next;
598                         batch_adjust(\$max, $bytes, $batch_cb, $latest);
599                         $del_cb->($self, $mime);
600                 } elsif ($line =~ /^commit ($h40)/o) {
601                         $latest = $1;
602                 }
603         }
604         $batch_cb->($latest, 0);
605 }
606
607 sub _msgmap_init {
608         my ($self) = @_;
609         die "BUG: _msgmap_init is only for v1\n" if $self->{version} != 1;
610         $self->{mm} ||= eval {
611                 require PublicInbox::Msgmap;
612                 PublicInbox::Msgmap->new($self->{mainrepo}, 1);
613         };
614 }
615
616 sub _git_log {
617         my ($self, $range) = @_;
618         $self->{git}->popen(qw/log --reverse --no-notes --no-color
619                                 --raw -r --no-abbrev/, $range);
620 }
621
622 # indexes all unindexed messages
623 sub _index_sync {
624         my ($self, $opts) = @_;
625         my $tip = $opts->{ref} || 'HEAD';
626         my $reindex = $opts->{reindex};
627         my ($mkey, $last_commit, $lx, $xlog);
628         $self->{git}->batch_prepare;
629         my $xdb = $self->begin_txn_lazy;
630         do {
631                 $xlog = undef;
632                 $mkey = 'last_commit';
633                 $last_commit = $xdb->get_metadata('last_commit');
634                 $lx = $last_commit;
635                 if ($reindex) {
636                         $lx = '';
637                         $mkey = undef if $last_commit ne '';
638                 }
639                 $self->{over}->rollback_lazy;
640                 $self->{over}->disconnect;
641                 delete $self->{txn};
642                 $xdb->cancel_transaction;
643                 $xdb = _xdb_release($self);
644
645                 # ensure we leak no FDs to "git log"
646                 my $range = $lx eq '' ? $tip : "$lx..$tip";
647                 $xlog = _git_log($self, $range);
648
649                 $xdb = $self->begin_txn_lazy;
650         } while ($xdb->get_metadata('last_commit') ne $last_commit);
651
652         my $mm = _msgmap_init($self);
653         my $dbh = $mm->{dbh} if $mm;
654         my $mm_only;
655         my $cb = sub {
656                 my ($commit, $more) = @_;
657                 if ($dbh) {
658                         $mm->last_commit($commit) if $commit;
659                         $dbh->commit;
660                 }
661                 if (!$mm_only) {
662                         $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
663                         $self->commit_txn_lazy;
664                 }
665                 # let another process do some work... <
666                 if ($more) {
667                         if (!$mm_only) {
668                                 $xdb = $self->begin_txn_lazy;
669                         }
670                         $dbh->begin_work if $dbh;
671                 }
672         };
673
674         if ($mm) {
675                 $dbh->begin_work;
676                 my $lm = $mm->last_commit || '';
677                 if ($lm eq $lx) {
678                         # Common case is the indexes are synced,
679                         # we only need to run git-log once:
680                         rlog($self, $xlog, *index_both, *unindex_both, $cb);
681                 } else {
682                         # Uncommon case, msgmap and xapian are out-of-sync
683                         # do not care for performance (but git is fast :>)
684                         # This happens if we have to reindex Xapian since
685                         # msgmap is a frozen format and our Xapian format
686                         # is evolving.
687                         my $r = $lm eq '' ? $tip : "$lm..$tip";
688
689                         # first, ensure msgmap is up-to-date:
690                         my $mkey_prev = $mkey;
691                         $mkey = undef; # ignore xapian, for now
692                         my $mlog = _git_log($self, $r);
693                         $mm_only = 1;
694                         rlog($self, $mlog, *index_mm, *unindex_mm, $cb);
695                         $mm_only = $mlog = undef;
696
697                         # now deal with Xapian
698                         $mkey = $mkey_prev;
699                         $dbh = undef;
700                         rlog($self, $xlog, *index_mm2, *unindex_mm2, $cb);
701                 }
702         } else {
703                 # user didn't install DBD::SQLite and DBI
704                 rlog($self, $xlog, *add_message, *unindex_blob, $cb);
705         }
706 }
707
708 sub DESTROY {
709         # order matters for unlocking
710         $_[0]->{xdb} = undef;
711         $_[0]->{lockfh} = undef;
712 }
713
714 # remote_* subs are only used by SearchIdxPart
715 sub remote_commit {
716         my ($self) = @_;
717         if (my $w = $self->{w}) {
718                 print $w "commit\n" or die "failed to write commit: $!";
719         } else {
720                 $self->commit_txn_lazy;
721         }
722 }
723
724 sub remote_close {
725         my ($self) = @_;
726         if (my $w = delete $self->{w}) {
727                 my $pid = delete $self->{pid} or die "no process to wait on\n";
728                 print $w "close\n" or die "failed to write to pid:$pid: $!\n";
729                 close $w or die "failed to close pipe for pid:$pid: $!\n";
730                 waitpid($pid, 0) == $pid or die "remote process did not finish";
731                 $? == 0 or die ref($self)." pid:$pid exited with: $?";
732         } else {
733                 die "transaction in progress $self\n" if $self->{txn};
734                 $self->_xdb_release if $self->{xdb};
735         }
736 }
737
738 sub remote_remove {
739         my ($self, $oid, $mid) = @_;
740         if (my $w = $self->{w}) {
741                 # triggers remove_by_oid in a partition
742                 print $w "D $oid $mid\n" or die "failed to write remove $!";
743         } else {
744                 $self->begin_txn_lazy;
745                 $self->remove_by_oid($oid, $mid);
746         }
747 }
748
749 sub begin_txn_lazy {
750         my ($self) = @_;
751         return if $self->{txn};
752         my $xdb = $self->{xdb} || $self->_xdb_acquire;
753         $self->{over}->begin_lazy;
754         $xdb->begin_transaction;
755         $self->{txn} = 1;
756         $xdb;
757 }
758
759 sub commit_txn_lazy {
760         my ($self) = @_;
761         delete $self->{txn} or return;
762         $self->{xdb}->commit_transaction;
763         $self->{over}->commit_lazy;
764 }
765
766 sub worker_done {
767         my ($self) = @_;
768         die "$$ $0 xdb not released\n" if $self->{xdb};
769         die "$$ $0 still in transaction\n" if $self->{txn};
770 }
771
772 1;