]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/ExtSearchIdx.pm
index: fix --no-fsync flag propagation to extindex
[public-inbox.git] / lib / PublicInbox / ExtSearchIdx.pm
index b502482303797ada7b57e6f6d4c04952a2130950..e7fdae48ed79ac4f9eecdd14caac8cf8ea98976d 100644 (file)
@@ -18,6 +18,8 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::ExtSearch PublicInbox::Lock);
 use Carp qw(croak carp);
+use Sys::Hostname qw(hostname);
+use POSIX qw(strftime);
 use PublicInbox::Search;
 use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor
        is_bad_blob);
@@ -28,13 +30,11 @@ use PublicInbox::V2Writable;
 use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
-use File::Spec;
 use PublicInbox::DS qw(now);
 use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
        my (undef, $dir, $opt) = @_;
-       $dir = File::Spec->canonpath($dir);
        my $l = $opt->{indexlevel} // 'full';
        $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
                die "invalid indexlevel=$l\n";
@@ -54,30 +54,14 @@ sub new {
        }, __PACKAGE__;
        $self->{shards} = $self->count_shards || nproc_shards($opt->{creat});
        my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3");
-       $oidx->{-no_fsync} = 1 if $opt->{-no_fsync};
+       $self->{-no_fsync} = $oidx->{-no_fsync} = 1 if !$opt->{fsync};
        $self->{oidx} = $oidx;
        $self
 }
 
 sub attach_inbox {
        my ($self, $ibx) = @_;
-       $ibx = PublicInbox::InboxWritable->new($ibx);
-       my $key = $ibx->eidx_key;
-       if (!$ibx->over || !$ibx->mm) {
-               warn "W: skipping $key (unindexed)\n";
-               return;
-       }
-       if (!defined($ibx->uidvalidity)) {
-               warn "W: skipping $key (no UIDVALIDITY)\n";
-               return;
-       }
-       my $ibxdir = File::Spec->canonpath($ibx->{inboxdir});
-       if ($ibxdir ne $ibx->{inboxdir}) {
-               warn "W: `$ibx->{inboxdir}' canonicalized to `$ibxdir'\n";
-               $ibx->{inboxdir} = $ibxdir;
-       }
-       $ibx = PublicInbox::InboxWritable->new($ibx);
-       $self->{ibx_map}->{$key} //= do {
+       $self->{ibx_map}->{$ibx->eidx_key} //= do {
                push @{$self->{ibx_list}}, $ibx;
                $ibx;
        }
@@ -281,31 +265,36 @@ sub last_commits {
        $heads;
 }
 
+sub _ibx_index_reject ($) {
+       my ($ibx) = @_;
+       $ibx->mm // return 'unindexed, no msgmap.sqlite3';
+       $ibx->uidvalidity // return 'no UIDVALIDITY';
+       $ibx->over // return 'unindexed, no over.sqlite3';
+       undef;
+}
+
 sub _sync_inbox ($$$) {
        my ($self, $sync, $ibx) = @_;
+       my $ekey = $ibx->eidx_key;
+       if (defined(my $err = _ibx_index_reject($ibx))) {
+               return "W: skipping $ekey ($err)";
+       }
        $sync->{ibx} = $ibx;
        $sync->{nr} = \(my $nr = 0);
        my $v = $ibx->version;
-       my $ekey = $ibx->eidx_key;
        if ($v == 2) {
-               my $epoch_max;
-               defined($ibx->git_dir_latest(\$epoch_max)) or return;
-               $sync->{epoch_max} = $epoch_max;
+               $sync->{epoch_max} = $ibx->max_git_epoch // return;
                sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
        } elsif ($v == 1) {
                my $uv = $ibx->uidvalidity;
                my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv");
-               my $head = $ibx->mm->last_commit;
-               unless (defined $head) {
-                       warn "E: $ibx->{inboxdir} is not indexed\n";
-                       return;
-               }
+               my $head = $ibx->mm->last_commit //
+                       return "E: $ibx->{inboxdir} is not indexed";
                my $stk = prepare_stack($sync, $lc ? "$lc..$head" : $head);
                my $unit = { stack => $stk, git => $ibx->git };
                push @{$sync->{todo}}, $unit;
        } else {
-               warn "E: $ekey unsupported inbox version (v$v)\n";
-               return;
+               return "E: $ekey unsupported inbox version (v$v)";
        }
        for my $unit (@{delete($sync->{todo}) // []}) {
                last if $sync->{quit};
@@ -313,6 +302,7 @@ sub _sync_inbox ($$$) {
        }
        $self->{midx}->index_ibx($ibx) unless $sync->{quit};
        $ibx->git->cleanup; # done with this inbox, now
+       undef;
 }
 
 sub gc_unref_doc ($$$$) {
@@ -403,6 +393,32 @@ sub _ibx_for ($$$) {
        $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
 }
 
+sub _fd_constrained ($) {
+       my ($self) = @_;
+       $self->{-fd_constrained} //= do {
+               my $soft;
+               if (eval { require BSD::Resource; 1 }) {
+                       my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
+                       ($soft, undef) = BSD::Resource::getrlimit($NOFILE);
+               } else {
+                       chomp($soft = `sh -c 'ulimit -n'`);
+               }
+               if (defined($soft)) {
+                       my $want = scalar(@{$self->{ibx_list}}) + 64; # estimate
+                       my $ret = $want > $soft;
+                       if ($ret) {
+                               warn <<EOF;
+RLIMIT_NOFILE=$soft insufficient (want: $want), will close DB handles early
+EOF
+                       }
+                       $ret;
+               } else {
+                       warn "Unable to determine RLIMIT_NOFILE: $@\n";
+                       1;
+               }
+       };
+}
+
 sub _reindex_finalize ($$$) {
        my ($req, $smsg, $eml) = @_;
        my $sync = $req->{sync};
@@ -439,11 +455,16 @@ sub _reindex_finalize ($$$) {
                my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
                $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
                $ibx = _ibx_for($self, $sync, $x);
-               my $e = $ibx->over->get_art($x->{num});
-               $e->{blob} eq $x->{blob} or die <<EOF;
+               if (my $over = $ibx->over) {
+                       my $e = $over->get_art($x->{num});
+                       $e->{blob} eq $x->{blob} or die <<EOF;
 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
 EOF
-               push @todo, $ibx, $e;
+                       push @todo, $ibx, $e;
+                       $over->dbh_close if _fd_constrained($self);
+               } else {
+                       die "$ibx->{inboxdir}: over.sqlite3 unusable: $!\n";
+               }
        }
        undef $by_chash;
        while (my ($ibx, $e) = splice(@todo, 0, 2)) {
@@ -524,9 +545,88 @@ sub checkpoint_due ($) {
        ${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
 }
 
+sub host_ident () {
+       # I've copied FS images and only changed the hostname before,
+       # so prepend hostname.  Use `state' since these a BOFH can change
+       # these while this process is running and we always want to be
+       # able to release locks taken by this process.
+       state $retval = hostname . '-' . do {
+               my $m; # machine-id(5) is systemd
+               if (open(my $fh, '<', '/etc/machine-id')) { $m = <$fh> }
+               # (g)hostid(1) is in GNU coreutils, kern.hostid is most BSDs
+               chomp($m ||= `{ sysctl -n kern.hostid ||
+                               hostid || ghostid; } 2>/dev/null`
+                       || "no-machine-id-or-hostid-on-$^O");
+               $m;
+       };
+}
+
+sub eidxq_release {
+       my ($self) = @_;
+       my $expect = delete($self->{-eidxq_locked}) or return;
+       my ($owner_pid, undef) = split(/-/, $expect);
+       return if $owner_pid != $$; # shards may fork
+       my $oidx = $self->{oidx};
+       $oidx->begin_lazy;
+       my $cur = $oidx->eidx_meta('eidxq_lock') // '';
+       if ($cur eq $expect) {
+               $oidx->eidx_meta('eidxq_lock', '');
+               return 1;
+       } elsif ($cur ne '') {
+               warn "E: eidxq_lock($expect) stolen by $cur\n";
+       } else {
+               warn "E: eidxq_lock($expect) released by another process\n";
+       }
+       undef;
+}
+
+sub DESTROY {
+       my ($self) = @_;
+       eidxq_release($self) and $self->{oidx}->commit_lazy;
+}
+
+sub _eidxq_take ($) {
+       my ($self) = @_;
+       my $val = "$$-${\time}-$>-".host_ident;
+       $self->{oidx}->eidx_meta('eidxq_lock', $val);
+       $self->{-eidxq_locked} = $val;
+}
+
+sub eidxq_lock_acquire ($) {
+       my ($self) = @_;
+       my $oidx = $self->{oidx};
+       $oidx->begin_lazy;
+       my $cur = $oidx->eidx_meta('eidxq_lock') || return _eidxq_take($self);
+       if (my $locked = $self->{-eidxq_locked}) { # be lazy
+               return $locked if $locked eq $cur;
+       }
+       my ($pid, $time, $euid, $ident) = split(/-/, $cur, 4);
+       my $t = strftime('%Y-%m-%d %k:%M:%S', gmtime($time));
+       if ($euid == $> && $ident eq host_ident) {
+               if (kill(0, $pid)) {
+                       warn <<EOM; return;
+I: PID:$pid (re)indexing Xapian since $t, it will continue our work
+EOM
+               }
+               if ($!{ESRCH}) {
+                       warn "I: eidxq_lock is stale ($cur), clobbering\n";
+                       return _eidxq_take($self);
+               }
+               warn "E: kill(0, $pid) failed: $!\n"; # fall-through:
+       }
+       my $fn = $oidx->dbh->sqlite_db_filename;
+       warn <<EOF;
+W: PID:$pid, UID:$euid on $ident is indexing Xapian since $t
+W: If this is unexpected, delete `eidxq_lock' from the `eidx_meta' table:
+W:     sqlite3 $fn 'DELETE FROM eidx_meta WHERE key = "eidxq_lock"'
+EOF
+       undef;
+}
+
 sub eidxq_process ($$) { # for reindexing
        my ($self, $sync) = @_;
 
+       return unless eidxq_lock_acquire($self);
        my $dbh = $self->{oidx}->dbh;
        my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
        ${$sync->{nr}} = 0;
@@ -710,15 +810,26 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
 sub _reindex_inbox ($$$) {
        my ($self, $sync, $ibx) = @_;
-       local $self->{current_info} = $ibx->eidx_key;
-       _reindex_check_unseen($self, $sync, $ibx);
-       _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+       my $ekey = $ibx->eidx_key;
+       local $self->{current_info} = $ekey;
+       if (defined(my $err = _ibx_index_reject($ibx))) {
+               warn "W: cannot reindex $ekey ($err)\n";
+       } else {
+               _reindex_check_unseen($self, $sync, $ibx);
+               _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+       }
        delete @$ibx{qw(over mm search git)}; # won't need these for a bit
 }
 
 sub eidx_reindex {
        my ($self, $sync) = @_;
 
+       # acquire eidxq_lock early because full reindex takes forever
+       # and incremental -extindex processes can run during our checkpoints
+       if (!eidxq_lock_acquire($self)) {
+               warn "E: aborting --reindex\n";
+               return;
+       }
        for my $ibx (@{$self->{ibx_list}}) {
                _reindex_inbox($self, $sync, $ibx);
                last if $sync->{quit};
@@ -764,11 +875,14 @@ sub eidx_sync { # main entry point
        # don't use $_ here, it'll get clobbered by reindex_checkpoint
        for my $ibx (@{$self->{ibx_list}}) {
                last if $sync->{quit};
-               _sync_inbox($self, $sync, $ibx);
+               my $err = _sync_inbox($self, $sync, $ibx);
+               delete @$ibx{qw(mm over)};
+               warn $err, "\n" if defined($err);
        }
        $self->{oidx}->rethread_done($opt) unless $sync->{quit};
        eidxq_process($self, $sync) unless $sync->{quit};
 
+       eidxq_release($self);
        PublicInbox::V2Writable::done($self);
 }
 
@@ -850,7 +964,6 @@ sub idx_init { # similar to V2Writable
                PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o);
        }
        $self->parallel_init($self->{indexlevel});
-       $self->umask_prepare;
        $self->with_umask(\&_idx_init, $self, $opt);
        $self->{oidx}->begin_lazy;
        $self->{oidx}->eidx_prep;
@@ -859,7 +972,6 @@ sub idx_init { # similar to V2Writable
 
 no warnings 'once';
 *done = \&PublicInbox::V2Writable::done;
-*umask_prepare = \&PublicInbox::InboxWritable::umask_prepare;
 *with_umask = \&PublicInbox::InboxWritable::with_umask;
 *parallel_init = \&PublicInbox::V2Writable::parallel_init;
 *nproc_shards = \&PublicInbox::V2Writable::nproc_shards;