use base qw(PublicInbox::Search);
use PublicInbox::MID qw/mid_clean id_compress mid_mime/;
use PublicInbox::MsgIter;
+use Carp qw(croak);
require PublicInbox::Git;
*xpfx = *PublicInbox::Search::xpfx;
PERM_EVERYBODY => 0664,
};
-# XXX temporary hack...
-my $xap_ver = ((Search::Xapian::major_version << 16) |
- (Search::Xapian::minor_version << 8 ) |
- Search::Xapian::revision());
-our $XAP_LOCK_BROKEN = $xap_ver >= 0x010216; # >= 1.2.22
-
sub new {
- my ($class, $git_dir, $writable) = @_;
- my $dir = PublicInbox::Search->xdir($git_dir);
+ my ($class, $git_dir, $creat) = @_;
require Search::Xapian::WritableDatabase;
- my $flag = Search::Xapian::DB_OPEN;
my $self = bless { git_dir => $git_dir }, $class;
my $perm = $self->_git_config_perm;
my $umask = _umask_for($perm);
$self->{umask} = $umask;
$self->{lock_path} = "$git_dir/ssoma.lock";
$self->{git} = PublicInbox::Git->new($git_dir);
- $self->{xdb} = $self->with_umask(sub {
- if ($writable == 1) {
- require File::Path;
- File::Path::mkpath($dir);
- $self->{batch_size} = 100 unless $XAP_LOCK_BROKEN;
- $flag = Search::Xapian::DB_CREATE_OR_OPEN;
- _lock_acquire($self);
- }
- Search::Xapian::WritableDatabase->new($dir, $flag);
- });
+ $self->{creat} = ($creat || 0) == 1;
$self;
}
sub _xdb_release {
my ($self) = @_;
- my $xdb = delete $self->{xdb};
- $xdb->commit_transaction;
+ my $xdb = delete $self->{xdb} or croak 'not acquired';
$xdb->close;
+ _lock_release($self) if $self->{creat};
+ undef;
}
sub _xdb_acquire {
- my ($self, $more) = @_;
+ my ($self) = @_;
+ croak 'already acquired' if $self->{xdb};
my $dir = PublicInbox::Search->xdir($self->{git_dir});
my $flag = Search::Xapian::DB_OPEN;
- my $xdb = Search::Xapian::WritableDatabase->new($dir, $flag);
- $xdb->begin_transaction if $more;
- $self->{xdb} = $xdb;
+ if ($self->{creat}) {
+ require File::Path;
+ _lock_acquire($self);
+ File::Path::mkpath($dir);
+ $self->{batch_size} = 100;
+ $flag = Search::Xapian::DB_CREATE_OR_OPEN;
+ }
+ $self->{xdb} = Search::Xapian::WritableDatabase->new($dir, $flag);
}
+# we only acquire the flock if creating or reindexing;
+# PublicInbox::Import already has the lock on its own.
sub _lock_acquire {
my ($self) = @_;
+ croak 'already locked' if $self->{lockfh};
sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or
die "failed to open lock $self->{lock_path}: $!\n";
flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
sub _lock_release {
my ($self) = @_;
- my $lockfh = delete $self->{lockfh};
+ my $lockfh = delete $self->{lockfh} or croak 'not locked';
flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
close $lockfh or die "close failed: $!\n";
}
}
sub rlog {
- my ($self, $range, $add_cb, $del_cb, $batch_cb) = @_;
+ my ($self, $log, $add_cb, $del_cb, $batch_cb) = @_;
my $hex = '[a-f0-9]';
my $h40 = $hex .'{40}';
my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
my $git = $self->{git};
- my $log = $git->popen(qw/log --reverse --no-notes --no-color
- --raw -r --no-abbrev/, $range);
my $latest;
my $bytes;
my $max = $self->{batch_size}; # may be undef
$batch_cb->($latest, 0);
}
+sub _msgmap_init {
+ my ($self) = @_;
+ $self->{mm} = eval {
+ require PublicInbox::Msgmap;
+ PublicInbox::Msgmap->new($self->{git_dir}, 1);
+ };
+}
+
+sub _git_log {
+ my ($self, $range) = @_;
+ $self->{git}->popen(qw/log --reverse --no-notes --no-color
+ --raw -r --no-abbrev/, $range);
+}
+
# indexes all unindexed messages
sub _index_sync {
my ($self, $opts) = @_;
my $tip = $opts->{ref} || 'HEAD';
- my $mm = $self->{mm} = eval {
- require PublicInbox::Msgmap;
- PublicInbox::Msgmap->new($self->{git_dir}, 1);
- };
- my $xdb = $self->{xdb};
- $xdb->begin_transaction;
my $reindex = $opts->{reindex};
- my $mkey = 'last_commit';
- my $last_commit = $xdb->get_metadata($mkey);
- my $lx = $last_commit;
- if ($reindex) {
- $lx = '';
- $mkey = undef if $last_commit ne '';
- }
- my $dbh;
+ my ($mkey, $last_commit, $lx, $xlog);
+ my $xdb = _xdb_acquire($self);
+ $xdb->begin_transaction;
+ do {
+ $xlog = undef;
+ $mkey = 'last_commit';
+ $last_commit = $xdb->get_metadata('last_commit');
+ $lx = $last_commit;
+ if ($reindex) {
+ $lx = '';
+ $mkey = undef if $last_commit ne '';
+ }
+ $xdb->cancel_transaction;
+ $xdb = _xdb_release($self);
+
+ # ensure we leak no FDs to "git log"
+ my $range = $lx eq '' ? $tip : "$lx..$tip";
+ $xlog = _git_log($self, $range);
+
+ $xdb = _xdb_acquire($self);
+ $xdb->begin_transaction;
+ } while ($xdb->get_metadata('last_commit') ne $last_commit);
+
+ my $mm = _msgmap_init($self);
+ my $dbh = $mm->{dbh} if $mm;
my $cb = sub {
my ($commit, $more) = @_;
- $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
if ($dbh) {
$mm->last_commit($commit) if $commit;
$dbh->commit;
}
- if ($XAP_LOCK_BROKEN) {
- $xdb->commit_transaction if !$more;
- } else {
- $xdb = undef;
- _xdb_release($self);
- _lock_release($self);
- }
- # let another process do some work...
- if (!$XAP_LOCK_BROKEN) {
- _lock_acquire($self);
- $dbh->begin_work if $dbh && $more;
- $xdb = _xdb_acquire($self, $more);
+ $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
+ $xdb->commit_transaction;
+ $xdb = _xdb_release($self);
+ # let another process do some work... <
+ if ($more) {
+ $xdb = _xdb_acquire($self);
+ $xdb->begin_transaction;
+ $dbh->begin_work if $dbh;
}
};
- my $range = $lx eq '' ? $tip : "$lx..$tip";
if ($mm) {
- $dbh = $mm->{dbh};
$dbh->begin_work;
my $lm = $mm->last_commit || '';
if ($lm eq $lx) {
# Common case is the indexes are synced,
# we only need to run git-log once:
- rlog($self, $range, *index_both, *unindex_both, $cb);
+ rlog($self, $xlog, *index_both, *unindex_both, $cb);
} else {
# Uncommon case, msgmap and xapian are out-of-sync
# do not care for performance (but git is fast :>)
# first, ensure msgmap is up-to-date:
my $mkey_prev = $mkey;
$mkey = undef; # ignore xapian, for now
- rlog($self, $r, *index_mm, *unindex_mm, $cb);
+ my $mlog = _git_log($self, $r);
+ rlog($self, $mlog, *index_mm, *unindex_mm, $cb);
+ $mlog = undef;
# now deal with Xapian
$mkey = $mkey_prev;
$dbh = undef;
- rlog($self, $range, *index_mm2, *unindex_mm2, $cb);
+ $xdb = _xdb_acquire($self);
+ $xdb->begin_transaction;
+ rlog($self, $xlog, *index_mm2, *unindex_mm2, $cb);
}
} else {
# user didn't install DBD::SQLite and DBI
- rlog($self, $range, *index_blob, *unindex_blob, $cb);
+ rlog($self, $xlog, *index_blob, *unindex_blob, $cb);
}
}