#
# Indexes mail with Xapian and our (SQLite-based) ::Msgmap for use
# with the web and NNTP interfaces. This index maintains thread
-# relationships for use by Mail::Thread. This writes to the search
-# index.
+# relationships for use by PublicInbox::SearchThread.
+# This writes to the search index.
package PublicInbox::SearchIdx;
use strict;
use warnings;
use base qw(PublicInbox::Search);
use PublicInbox::MID qw/mid_clean id_compress mid_mime/;
use PublicInbox::MsgIter;
+use Carp qw(croak);
+use POSIX qw(strftime);
require PublicInbox::Git;
*xpfx = *PublicInbox::Search::xpfx;
PERM_EVERYBODY => 0664,
};
-# XXX temporary hack...
-my $xap_ver = ((Search::Xapian::major_version << 16) |
- (Search::Xapian::minor_version << 8 ) |
- Search::Xapian::revision());
-our $XAP_LOCK_BROKEN = $xap_ver >= 0x010216; # >= 1.2.22
-
sub new {
- my ($class, $git_dir, $writable) = @_;
- my $dir = PublicInbox::Search->xdir($git_dir);
+ my ($class, $inbox, $creat) = @_;
+ my $git_dir = $inbox;
+ my $altid;
+ if (ref $inbox) {
+ $git_dir = $inbox->{mainrepo};
+ $altid = $inbox->{altid};
+ if ($altid) {
+ require PublicInbox::AltId;
+ $altid = [ map {
+ PublicInbox::AltId->new($inbox, $_);
+ } @$altid ];
+ }
+ }
require Search::Xapian::WritableDatabase;
- my $flag = Search::Xapian::DB_OPEN;
- my $self = bless { git_dir => $git_dir }, $class;
+ my $self = bless { git_dir => $git_dir, -altid => $altid }, $class;
my $perm = $self->_git_config_perm;
my $umask = _umask_for($perm);
$self->{umask} = $umask;
$self->{lock_path} = "$git_dir/ssoma.lock";
$self->{git} = PublicInbox::Git->new($git_dir);
- $self->{xdb} = $self->with_umask(sub {
- if ($writable == 1) {
- require File::Path;
- File::Path::mkpath($dir);
- $self->{batch_size} = 100 unless $XAP_LOCK_BROKEN;
- $flag = Search::Xapian::DB_CREATE_OR_OPEN;
- _lock_acquire($self);
- }
- Search::Xapian::WritableDatabase->new($dir, $flag);
- });
+ $self->{creat} = ($creat || 0) == 1;
$self;
}
sub _xdb_release {
my ($self) = @_;
- my $xdb = delete $self->{xdb};
- $xdb->commit_transaction;
+ my $xdb = delete $self->{xdb} or croak 'not acquired';
$xdb->close;
+ _lock_release($self) if $self->{creat};
+ undef;
}
sub _xdb_acquire {
- my ($self, $more) = @_;
+ my ($self) = @_;
+ croak 'already acquired' if $self->{xdb};
my $dir = PublicInbox::Search->xdir($self->{git_dir});
my $flag = Search::Xapian::DB_OPEN;
- my $xdb = Search::Xapian::WritableDatabase->new($dir, $flag);
- $xdb->begin_transaction if $more;
- $self->{xdb} = $xdb;
+ if ($self->{creat}) {
+ require File::Path;
+ _lock_acquire($self);
+ File::Path::mkpath($dir);
+ $self->{batch_size} = 100;
+ $flag = Search::Xapian::DB_CREATE_OR_OPEN;
+ }
+ $self->{xdb} = Search::Xapian::WritableDatabase->new($dir, $flag);
}
+# we only acquire the flock if creating or reindexing;
+# PublicInbox::Import already has the lock on its own.
sub _lock_acquire {
my ($self) = @_;
+ croak 'already locked' if $self->{lockfh};
sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or
die "failed to open lock $self->{lock_path}: $!\n";
flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
sub _lock_release {
my ($self) = @_;
- my $lockfh = delete $self->{lockfh};
+ my $lockfh = delete $self->{lockfh} or croak 'not locked';
flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
close $lockfh or die "close failed: $!\n";
}
-sub add_val {
+sub add_val ($$$) {
my ($doc, $col, $num) = @_;
$num = Search::Xapian::sortable_serialise($num);
$doc->add_value($col, $num);
}
+sub add_values ($$$) {
+ my ($smsg, $bytes, $num) = @_;
+
+ my $ts = $smsg->ts;
+ my $doc = $smsg->{doc};
+ add_val($doc, &PublicInbox::Search::TS, $ts);
+
+ defined($num) and add_val($doc, &PublicInbox::Search::NUM, $num);
+
+ defined($bytes) and add_val($doc, &PublicInbox::Search::BYTES, $bytes);
+
+ add_val($doc, &PublicInbox::Search::LINES,
+ $smsg->{mime}->body_raw =~ tr!\n!\n!);
+
+ my $yyyymmdd = strftime('%Y%m%d', gmtime($ts));
+ add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
+}
+
+sub index_users ($$) {
+ my ($tg, $smsg) = @_;
+
+ my $from = $smsg->from;
+ my $to = $smsg->to;
+ my $cc = $smsg->cc;
+
+ $tg->index_text($from, 1, 'A'); # A - author
+ $tg->increase_termpos;
+ $tg->index_text($to, 1, 'XTO') if $to ne '';
+ $tg->increase_termpos;
+ $tg->index_text($cc, 1, 'XCC') if $cc ne '';
+ $tg->increase_termpos;
+}
+
+sub index_body ($$$) {
+ my ($tg, $lines, $inc) = @_;
+ $tg->index_text(join("\n", @$lines), $inc, $inc ? 'XNQ' : 'XQUOT');
+ @$lines = ();
+ $tg->increase_termpos;
+}
+
sub add_message {
my ($self, $mime, $bytes, $num, $blob) = @_; # mime = Email::MIME object
my $db = $self->{xdb};
my ($doc_id, $old_tid);
my $mid = mid_clean(mid_mime($mime));
- my $ct_msg = $mime->header('Content-Type') || 'text/plain';
eval {
die 'Message-ID too long' if length($mid) > MAX_MID_SIZE;
if ($smsg) {
# convert a ghost to a regular message
# it will also clobber any existing regular message
- $doc_id = $smsg->doc_id;
+ $doc_id = $smsg->{doc_id};
$old_tid = $smsg->thread_id;
}
$smsg = PublicInbox::SearchMsg->new($mime);
$doc->add_term(xpfx('path') . id_compress($path));
}
- add_val($doc, &PublicInbox::Search::TS, $smsg->ts);
-
- defined($num) and
- add_val($doc, &PublicInbox::Search::NUM, $num);
-
- defined($bytes) and
- add_val($doc, &PublicInbox::Search::BYTES, $bytes);
-
- add_val($doc, &PublicInbox::Search::LINES,
- $mime->body_raw =~ tr!\n!\n!);
+ add_values($smsg, $bytes, $num);
my $tg = $self->term_generator;
$tg->set_document($doc);
$tg->index_text($subj, 1, 'S') if $subj;
$tg->increase_termpos;
- $tg->index_text($subj) if $subj;
- $tg->increase_termpos;
- $tg->index_text($smsg->from);
- $tg->increase_termpos;
+ index_users($tg, $smsg);
msg_iter($mime, sub {
my ($part, $depth, @idx) = @{$_[0]};
- my $ct = $part->content_type || $ct_msg;
+ my $ct = $part->content_type || 'text/plain';
+ my $fn = $part->filename;
+ if (defined $fn && $fn ne '') {
+ $tg->index_text($fn, 1, 'XFN');
+ }
- # account for filter bugs...
- $ct =~ m!\btext/plain\b!i or return;
+ return if $ct =~ m!\btext/x?html\b!i;
+
+ my $s = eval { $part->body_str };
+ if ($@) {
+ if ($ct =~ m!\btext/plain\b!i) {
+ # Try to assume UTF-8 because Alpine
+ # seems to do wacky things and set
+ # charset=X-UNKNOWN
+ $part->charset_set('UTF-8');
+ $s = eval { $part->body_str };
+ $s = $part->body if $@;
+ }
+ }
+ defined $s or return;
my (@orig, @quot);
my $body = $part->body;
- $part->body_set('');
my @lines = split(/\n/, $body);
while (defined(my $l = shift @lines)) {
- if ($l =~ /^\s*>/) {
+ if ($l =~ /^>/) {
+ index_body($tg, \@orig, 1) if @orig;
push @quot, $l;
} else {
+ index_body($tg, \@quot, 0) if @quot;
push @orig, $l;
}
}
- if (@quot) {
- $tg->index_text(join("\n", @quot), 0);
- @quot = ();
- $tg->increase_termpos;
- }
- if (@orig) {
- $tg->index_text(join("\n", @orig));
- @orig = ();
- $tg->increase_termpos;
- }
+ index_body($tg, \@quot, 0) if @quot;
+ index_body($tg, \@orig, 1) if @orig;
});
link_message($self, $smsg, $old_tid);
+ $tg->index_text($mid, 1, 'XMID');
$doc->set_data($smsg->to_doc_data($blob));
+
+ if (my $altid = $self->{-altid}) {
+ foreach my $alt (@$altid) {
+ my $id = $alt->mid2alt($mid);
+ next unless defined $id;
+ $doc->add_term($alt->{xprefix} . $id);
+ }
+ }
if (defined $doc_id) {
$db->replace_document($doc_id, $doc);
} else {
my ($self, $smsg, $old_tid) = @_;
my $doc = $smsg->{doc};
my $mid = $smsg->mid;
- my $mime = $smsg->mime;
+ my $mime = $smsg->{mime};
my $hdr = $mime->header_obj;
my $refs = $hdr->header_raw('References');
my @refs = $refs ? ($refs =~ /<([^>]+)>/g) : ();
- if (my $irt = $hdr->header_raw('In-Reply-To')) {
- # last References should be $irt
- # we will de-dupe later
- push @refs, mid_clean($irt);
+ my $irt = $hdr->header_raw('In-Reply-To');
+ if (defined $irt) {
+ $irt = mid_clean($irt);
+ $irt = undef if $mid eq $irt;
}
my $tid;
my @orig_refs = @refs;
@refs = ();
+ if (defined $irt) {
+ # to check MAX_MID_SIZE
+ push @orig_refs, $irt;
+
+ # below, we will ensure IRT (if specified)
+ # is the last References
+ $uniq{$irt} = 1;
+ }
+
# prevent circular references via References: here:
foreach my $ref (@orig_refs) {
if (length($ref) > MAX_MID_SIZE) {
push @refs, $ref;
}
}
+
+ # last References should be IRT, but some mail clients do things
+ # out of order, so trust IRT over References iff IRT exists
+ push @refs, $irt if defined $irt;
+
if (@refs) {
$smsg->{references} = '<'.join('> <', @refs).'>';
}
sub rlog {
- my ($self, $range, $add_cb, $del_cb, $batch_cb) = @_;
+ my ($self, $log, $add_cb, $del_cb, $batch_cb) = @_;
my $hex = '[a-f0-9]';
my $h40 = $hex .'{40}';
my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
my $git = $self->{git};
- my $log = $git->popen(qw/log --reverse --no-notes --no-color
- --raw -r --no-abbrev/, $range);
my $latest;
my $bytes;
my $max = $self->{batch_size}; # may be undef
$batch_cb->($latest, 0);
}
+sub _msgmap_init {
+ my ($self) = @_;
+ $self->{mm} = eval {
+ require PublicInbox::Msgmap;
+ PublicInbox::Msgmap->new($self->{git_dir}, 1);
+ };
+}
+
+sub _git_log {
+ my ($self, $range) = @_;
+ $self->{git}->popen(qw/log --reverse --no-notes --no-color
+ --raw -r --no-abbrev/, $range);
+}
+
# indexes all unindexed messages
sub _index_sync {
my ($self, $opts) = @_;
my $tip = $opts->{ref} || 'HEAD';
- my $mm = $self->{mm} = eval {
- require PublicInbox::Msgmap;
- PublicInbox::Msgmap->new($self->{git_dir}, 1);
- };
- my $xdb = $self->{xdb};
- $xdb->begin_transaction;
my $reindex = $opts->{reindex};
- my $mkey = 'last_commit';
- my $last_commit = $xdb->get_metadata($mkey);
- my $lx = $last_commit;
- if ($reindex) {
- $lx = '';
- $mkey = undef if $last_commit ne '';
- }
- my $dbh;
+ my ($mkey, $last_commit, $lx, $xlog);
+ $self->{git}->batch_prepare;
+ my $xdb = _xdb_acquire($self);
+ $xdb->begin_transaction;
+ do {
+ $xlog = undef;
+ $mkey = 'last_commit';
+ $last_commit = $xdb->get_metadata('last_commit');
+ $lx = $last_commit;
+ if ($reindex) {
+ $lx = '';
+ $mkey = undef if $last_commit ne '';
+ }
+ $xdb->cancel_transaction;
+ $xdb = _xdb_release($self);
+
+ # ensure we leak no FDs to "git log"
+ my $range = $lx eq '' ? $tip : "$lx..$tip";
+ $xlog = _git_log($self, $range);
+
+ $xdb = _xdb_acquire($self);
+ $xdb->begin_transaction;
+ } while ($xdb->get_metadata('last_commit') ne $last_commit);
+
+ my $mm = _msgmap_init($self);
+ my $dbh = $mm->{dbh} if $mm;
+ my $mm_only;
my $cb = sub {
my ($commit, $more) = @_;
- $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
if ($dbh) {
$mm->last_commit($commit) if $commit;
$dbh->commit;
}
- if ($XAP_LOCK_BROKEN) {
- $xdb->commit_transaction if !$more;
- } else {
- $xdb = undef;
- _xdb_release($self);
- _lock_release($self);
+ if (!$mm_only) {
+ $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
+ $xdb->commit_transaction;
+ $xdb = _xdb_release($self);
}
- # let another process do some work...
- if (!$XAP_LOCK_BROKEN) {
- _lock_acquire($self);
- $dbh->begin_work if $dbh && $more;
- $xdb = _xdb_acquire($self, $more);
+ # let another process do some work... <
+ if ($more) {
+ if (!$mm_only) {
+ $xdb = _xdb_acquire($self);
+ $xdb->begin_transaction;
+ }
+ $dbh->begin_work if $dbh;
}
};
- my $range = $lx eq '' ? $tip : "$lx..$tip";
if ($mm) {
- $dbh = $mm->{dbh};
$dbh->begin_work;
my $lm = $mm->last_commit || '';
if ($lm eq $lx) {
# Common case is the indexes are synced,
# we only need to run git-log once:
- rlog($self, $range, *index_both, *unindex_both, $cb);
+ rlog($self, $xlog, *index_both, *unindex_both, $cb);
} else {
# Uncommon case, msgmap and xapian are out-of-sync
# do not care for performance (but git is fast :>)
# first, ensure msgmap is up-to-date:
my $mkey_prev = $mkey;
$mkey = undef; # ignore xapian, for now
- rlog($self, $r, *index_mm, *unindex_mm, $cb);
+ my $mlog = _git_log($self, $r);
+ $mm_only = 1;
+ rlog($self, $mlog, *index_mm, *unindex_mm, $cb);
+ $mm_only = $mlog = undef;
# now deal with Xapian
$mkey = $mkey_prev;
$dbh = undef;
- rlog($self, $range, *index_mm2, *unindex_mm2, $cb);
+ rlog($self, $xlog, *index_mm2, *unindex_mm2, $cb);
}
} else {
# user didn't install DBD::SQLite and DBI
- rlog($self, $range, *index_blob, *unindex_blob, $cb);
+ rlog($self, $xlog, *index_blob, *unindex_blob, $cb);
}
}