]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/ExtSearchIdx.pm
wwwstream: show relative coderepo URLs correctly
[public-inbox.git] / lib / PublicInbox / ExtSearchIdx.pm
index d529573551c6fd8b8f6e89ae3b901f0880e28186..f04e044382fca72cef422fa1411b5dde23cefeab 100644 (file)
@@ -18,6 +18,8 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::ExtSearch PublicInbox::Lock);
 use Carp qw(croak carp);
+use Sys::Hostname qw(hostname);
+use POSIX qw(strftime);
 use PublicInbox::Search;
 use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor
        is_bad_blob);
@@ -28,12 +30,11 @@ use PublicInbox::V2Writable;
 use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
-use File::Spec;
+use PublicInbox::DS qw(now);
 use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
        my (undef, $dir, $opt) = @_;
-       $dir = File::Spec->canonpath($dir);
        my $l = $opt->{indexlevel} // 'full';
        $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
                die "invalid indexlevel=$l\n";
@@ -60,7 +61,6 @@ sub new {
 
 sub attach_inbox {
        my ($self, $ibx) = @_;
-       $ibx = PublicInbox::InboxWritable->new($ibx);
        my $key = $ibx->eidx_key;
        if (!$ibx->over || !$ibx->mm) {
                warn "W: skipping $key (unindexed)\n";
@@ -70,12 +70,6 @@ sub attach_inbox {
                warn "W: skipping $key (no UIDVALIDITY)\n";
                return;
        }
-       my $ibxdir = File::Spec->canonpath($ibx->{inboxdir});
-       if ($ibxdir ne $ibx->{inboxdir}) {
-               warn "W: `$ibx->{inboxdir}' canonicalized to `$ibxdir'\n";
-               $ibx->{inboxdir} = $ibxdir;
-       }
-       $ibx = PublicInbox::InboxWritable->new($ibx);
        $self->{ibx_map}->{$key} //= do {
                push @{$self->{ibx_list}}, $ibx;
                $ibx;
@@ -287,9 +281,7 @@ sub _sync_inbox ($$$) {
        my $v = $ibx->version;
        my $ekey = $ibx->eidx_key;
        if ($v == 2) {
-               my $epoch_max;
-               defined($ibx->git_dir_latest(\$epoch_max)) or return;
-               $sync->{epoch_max} = $epoch_max;
+               $sync->{epoch_max} = $ibx->max_git_epoch // return;
                sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
        } elsif ($v == 1) {
                my $uv = $ibx->uidvalidity;
@@ -518,9 +510,93 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
        $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
+sub checkpoint_due ($) {
+       my ($sync) = @_;
+       ${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
+}
+
+sub host_ident () {
+       # I've copied FS images and only changed the hostname before,
+       # so prepend hostname.  Use `state' since these a BOFH can change
+       # these while this process is running and we always want to be
+       # able to release locks taken by this process.
+       state $retval = hostname . '-' . do {
+               my $m; # machine-id(5) is systemd
+               if (open(my $fh, '<', '/etc/machine-id')) { $m = <$fh> }
+               # (g)hostid(1) is in GNU coreutils, kern.hostid is most BSDs
+               chomp($m ||= `{ sysctl -n kern.hostid ||
+                               hostid || ghostid; } 2>/dev/null`
+                       || "no-machine-id-or-hostid-on-$^O");
+               $m;
+       };
+}
+
+sub eidxq_release {
+       my ($self) = @_;
+       my $expect = delete($self->{-eidxq_locked}) or return;
+       my ($owner_pid, undef) = split(/-/, $expect);
+       return if $owner_pid != $$; # shards may fork
+       my $oidx = $self->{oidx};
+       $oidx->begin_lazy;
+       my $cur = $oidx->eidx_meta('eidxq_lock') // '';
+       if ($cur eq $expect) {
+               $oidx->eidx_meta('eidxq_lock', '');
+               return 1;
+       } elsif ($cur ne '') {
+               warn "E: eidxq_lock($expect) stolen by $cur\n";
+       } else {
+               warn "E: eidxq_lock($expect) released by another process\n";
+       }
+       undef;
+}
+
+sub DESTROY {
+       my ($self) = @_;
+       eidxq_release($self) and $self->{oidx}->commit_lazy;
+}
+
+sub _eidxq_take ($) {
+       my ($self) = @_;
+       my $val = "$$-${\time}-$>-".host_ident;
+       $self->{oidx}->eidx_meta('eidxq_lock', $val);
+       $self->{-eidxq_locked} = $val;
+}
+
+sub eidxq_lock_acquire ($) {
+       my ($self) = @_;
+       my $oidx = $self->{oidx};
+       $oidx->begin_lazy;
+       my $cur = $oidx->eidx_meta('eidxq_lock') || return _eidxq_take($self);
+       if (my $locked = $self->{-eidxq_locked}) { # be lazy
+               return $locked if $locked eq $cur;
+       }
+       my ($pid, $time, $euid, $ident) = split(/-/, $cur, 4);
+       my $t = strftime('%Y-%m-%d %k:%M:%S', gmtime($time));
+       if ($euid == $> && $ident eq host_ident) {
+               if (kill(0, $pid)) {
+                       warn <<EOM; return;
+I: PID:$pid (re)indexing Xapian since $t, it will continue our work
+EOM
+               }
+               if ($!{ESRCH}) {
+                       warn "I: eidxq_lock is stale ($cur), clobbering\n";
+                       return _eidxq_take($self);
+               }
+               warn "E: kill(0, $pid) failed: $!\n"; # fall-through:
+       }
+       my $fn = $oidx->dbh->sqlite_db_filename;
+       warn <<EOF;
+W: PID:$pid, UID:$euid on $ident is indexing Xapian since $t
+W: If this is unexpected, delete `eidxq_lock' from the `eidx_meta' table:
+W:     sqlite3 $fn 'DELETE FROM eidx_meta WHERE key = "eidxq_lock"'
+EOF
+       undef;
+}
+
 sub eidxq_process ($$) { # for reindexing
        my ($self, $sync) = @_;
 
+       return unless eidxq_lock_acquire($self);
        my $dbh = $self->{oidx}->dbh;
        my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
        ${$sync->{nr}} = 0;
@@ -531,13 +607,16 @@ sub eidxq_process ($$) { # for reindexing
                my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
                $pr->("Xapian indexing $min..$max (total=$tot)\n");
        }
-       my %id2pos;
-       my $pos = 0;
-       $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
-       $sync->{id2pos} = \%id2pos;
-
-       my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
-       my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+       $sync->{id2pos} //= do {
+               my %id2pos;
+               my $pos = 0;
+               $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+               \%id2pos;
+       };
+       my ($del, $iter);
+restart:
+       $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+       $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
        $iter->execute;
        while (defined(my $docid = $iter->fetchrow_array)) {
                last if $sync->{quit};
@@ -549,8 +628,12 @@ sub eidxq_process ($$) { # for reindexing
                $del->execute($docid);
                ++${$sync->{nr}};
 
-               # this is only for SIGUSR1, shards do their own accounting:
-               reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+               if (checkpoint_due($sync)) {
+                       $dbh = $del = $iter = undef;
+                       reindex_checkpoint($self, $sync); # release lock
+                       $dbh = $self->{oidx}->dbh;
+                       goto restart;
+               }
        }
        $self->git->async_wait_all;
        $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
@@ -589,16 +672,28 @@ sub reindex_unseen ($$$$) {
 sub _reindex_check_unseen ($$$) {
        my ($self, $sync, $ibx) = @_;
        my $ibx_id = $ibx->{-ibx_id};
-       my ($beg, $end) = (1, 1000);
+       my $slice = 1000;
+       my ($beg, $end) = (1, $slice);
 
        # first, check if we missed any messages in target $ibx
-       my $inx3 = $self->{oidx}->dbh->prepare(<<'');
-SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
-
        my $msgs;
+       my $pr = $sync->{-opt}->{-progress};
+       my $ekey = $ibx->eidx_key;
+       $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+       ${$sync->{nr}} = 0;
+
        while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+               ${$sync->{nr}} = $beg;
                $beg = $msgs->[-1]->{num} + 1;
-               $end = $beg + 1000;
+               $end = $beg + $slice;
+               if (checkpoint_due($sync)) {
+                       reindex_checkpoint($self, $sync); # release lock
+               }
+
+               my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT DISTINCT(docid) FROM xref3 WHERE
+ibx_id = ? AND xnum = ? AND oidbin = ?
+
                for my $xsmsg (@$msgs) {
                        my $oidbin = pack('H*', $xsmsg->{blob});
                        $inx3->bind_param(1, $ibx_id);
@@ -623,49 +718,69 @@ SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
 sub _reindex_check_stale ($$$) {
        my ($self, $sync, $ibx) = @_;
-
-       # now, check if there's stale xrefs
-       my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
-SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
-
-       $get_xnum->execute($ibx->{-ibx_id});
-       my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+       my $min = 0;
+       my $pr = $sync->{-opt}->{-progress};
+       my $fetching;
+       my $ekey = $ibx->eidx_key;
+       $sync->{-regen_fmt} =
+                       "$ekey check stale/missing %u/".$ibx->over->max."\n";
+       ${$sync->{nr}} = 0;
+       do {
+               if (checkpoint_due($sync)) {
+                       reindex_checkpoint($self, $sync); # release lock
+               }
+               # now, check if there's stale xrefs
+               my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ?
+ORDER BY docid,xnum ASC LIMIT 10000
+
+               $iter->execute($ibx->{-ibx_id}, $min);
+               $fetching = undef;
+
+               while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) {
+                       return if $sync->{quit};
+                       ${$sync->{nr}} = $xnum;
+
+                       $fetching = $min = $docid;
+                       my $smsg = $ibx->over->get_art($xnum);
+                       my $oidhex = unpack('H*', $oidbin);
+                       my $err;
+                       if (!$smsg) {
+                               $err = 'stale';
+                       } elsif ($smsg->{blob} ne $oidhex) {
+                               $err = "mismatch (!= $smsg->{blob})";
+                       } else {
+                               next; # likely, all good
+                       }
+                       # current_info already has eidx_key
+                       warn "$xnum:$oidhex (#$docid): $err\n";
+                       my $del = $self->{oidx}->dbh->prepare_cached(<<'');
 DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
-       while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
-               last if $sync->{quit};
-               my $smsg = $ibx->over->get_art($xnum);
-               my $oidhex = unpack('H*', $oidbin);
-               my $err;
-               if (!$smsg) {
-                       $err = 'stale';
-               } elsif ($smsg->{blob} ne $oidhex) {
-                       $err = "mismatch (!= $smsg->{blob})";
-               } else {
-                       next; # likely, all good
-               }
-               warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
-               $del_xref3->bind_param(1, $ibx->{-ibx_id});
-               $del_xref3->bind_param(2, $xnum);
-               $del_xref3->bind_param(3, $oidbin, SQL_BLOB);
-               $del_xref3->execute;
-
-               # get_xref3 over-fetches, but this is a rare path:
-               my $xr3 = $self->{oidx}->get_xref3($docid);
-               my $idx = $self->idx_shard($docid);
-               if (scalar(@$xr3) == 0) { # all gone
-                       $self->{oidx}->delete_by_num($docid);
-                       $self->{oidx}->eidxq_del($docid);
-                       $idx->shard_remove($docid);
-               } else { # enqueue for reindex of remaining messages
-                       $idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
-                       $self->{oidx}->eidxq_add($docid); # yes, add
+                       $del->bind_param(1, $ibx->{-ibx_id});
+                       $del->bind_param(2, $xnum);
+                       $del->bind_param(3, $oidbin, SQL_BLOB);
+                       $del->execute;
+
+                       # get_xref3 over-fetches, but this is a rare path:
+                       my $xr3 = $self->{oidx}->get_xref3($docid);
+                       my $idx = $self->idx_shard($docid);
+                       if (scalar(@$xr3) == 0) { # all gone
+                               $self->{oidx}->delete_by_num($docid);
+                               $self->{oidx}->eidxq_del($docid);
+                               $idx->shard_remove($docid);
+                       } else { # enqueue for reindex of remaining messages
+                               $idx->shard_remove_eidx_info($docid,
+                                                       $ibx->eidx_key);
+                               $self->{oidx}->eidxq_add($docid); # yes, add
+                       }
                }
-       }
+       } while (defined $fetching);
 }
 
 sub _reindex_inbox ($$$) {
        my ($self, $sync, $ibx) = @_;
+       local $self->{current_info} = $ibx->eidx_key;
        _reindex_check_unseen($self, $sync, $ibx);
        _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
        delete @$ibx{qw(over mm search git)}; # won't need these for a bit
@@ -674,6 +789,12 @@ sub _reindex_inbox ($$$) {
 sub eidx_reindex {
        my ($self, $sync) = @_;
 
+       # acquire eidxq_lock early because full reindex takes forever
+       # and incremental -extindex processes can run during our checkpoints
+       if (!eidxq_lock_acquire($self)) {
+               warn "E: aborting --reindex\n";
+               return;
+       }
        for my $ibx (@{$self->{ibx_list}}) {
                _reindex_inbox($self, $sync, $ibx);
                last if $sync->{quit};
@@ -694,6 +815,8 @@ sub eidx_sync { # main entry point
        $self->{oidx}->rethread_prepare($opt);
        my $sync = {
                need_checkpoint => \(my $need_checkpoint = 0),
+               check_intvl => 10,
+               next_check => now() + 10,
                -opt => $opt,
                # DO NOT SET {reindex} here, it's incompatible with reused
                # V2Writable code, reindex is totally different here
@@ -722,6 +845,7 @@ sub eidx_sync { # main entry point
        $self->{oidx}->rethread_done($opt) unless $sync->{quit};
        eidxq_process($self, $sync) unless $sync->{quit};
 
+       eidxq_release($self);
        PublicInbox::V2Writable::done($self);
 }