]> Sergey Matveev's repositories - public-inbox.git/commitdiff
v2writable: support reindexing Xapian
authorEric Wong (Contractor, The Linux Foundation) <e@80x24.org>
Wed, 21 Mar 2018 09:04:50 +0000 (09:04 +0000)
committerEric Wong (Contractor, The Linux Foundation) <e@80x24.org>
Thu, 22 Mar 2018 00:19:17 +0000 (00:19 +0000)
This still requires a msgmap.sqlite3 file to exist, but
it allows us to tweak Xapian indexing rules and reindex
the Xapian database online while -watch is running.

lib/PublicInbox/Msgmap.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/SearchIdxSkeleton.pm
lib/PublicInbox/V2Writable.pm
script/public-inbox-index

index 78922d3693c9647e0b11c3dc7fc300ce1a985147..128330506b151d16afe7cbde83429a06820f4891 100644 (file)
@@ -24,9 +24,8 @@ sub new {
        new_file($class, "$d/msgmap.sqlite3", $writable);
 }
 
-sub new_file {
-       my ($class, $f, $writable) = @_;
-
+sub dbh_new {
+       my ($f, $writable) = @_;
        my $dbh = DBI->connect("dbi:SQLite:dbname=$f",'','', {
                AutoCommit => 1,
                RaiseError => 1,
@@ -35,6 +34,13 @@ sub new_file {
                sqlite_use_immediate_transaction => 1,
        });
        $dbh->do('PRAGMA case_sensitive_like = ON');
+       $dbh;
+}
+
+sub new_file {
+       my ($class, $f, $writable) = @_;
+
+       my $dbh = dbh_new($f, $writable);
        my $self = bless { dbh => $dbh }, $class;
 
        if ($writable) {
@@ -49,12 +55,13 @@ sub new_file {
 # used to keep track of used numeric mappings for v2 reindex
 sub tmp_clone {
        my ($self) = @_;
-       my ($fh, $fn) = tempfile(EXLOCK => 0);
+       my ($fh, $fn) = tempfile('msgmap-XXXXXXXX', EXLOCK => 0, TMPDIR => 1);
        $self->{dbh}->sqlite_backup_to_file($fn);
        my $tmp = ref($self)->new_file($fn, 1);
        $tmp->{dbh}->do('PRAGMA synchronous = OFF');
        $tmp->{tmp_name} = $fn; # SQLite won't work if unlinked, apparently
-       $fh = undef;
+       $tmp->{pid} = $$;
+       close $fh or die "failed to close $fn: $!";
        $tmp;
 }
 
@@ -205,7 +212,28 @@ sub mid_set {
 sub DESTROY {
        my ($self) = @_;
        delete $self->{dbh};
-       unlink $self->{tmp_name} if defined $self->{tmp_name};
+       my $f = delete $self->{tmp_name};
+       if (defined $f && $self->{pid} == $$) {
+               unlink $f or warn "failed to unlink $f: $!\n";
+       }
+}
+
+sub atfork_parent {
+       my ($self) = @_;
+       my $f = $self->{tmp_name} or die "not a temporary clone\n";
+       delete $self->{dbh} and die "tmp_clone dbh not prepared for parent";
+       my $dbh = $self->{dbh} = dbh_new($f, 1);
+       $dbh->do('PRAGMA synchronous = OFF');
+}
+
+sub atfork_prepare {
+       my ($self) = @_;
+       my $f = $self->{tmp_name} or die "not a temporary clone\n";
+       $self->{pid} == $$ or
+               die "BUG: atfork_prepare not called from $self->{pid}\n";
+       $self->{dbh} or die "temporary clone not open\n";
+       # must clobber prepared statements
+       %$self = (tmp_name => $f, pid => $$);
 }
 
 1;
index ef723a4b23161362a9da9ca033f32d8832d5bfe0..7ac16ec283bf6c059c2f7ac14a12706da5f6f319 100644 (file)
@@ -352,7 +352,7 @@ sub add_message {
 
                # populates smsg->references for smsg->to_doc_data
                my $refs = parse_references($smsg);
-               $mid0 = $mids->[0] unless defined $mid0;
+               $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
                my $data = $smsg->to_doc_data($oid, $mid0);
                foreach my $mid (@$mids) {
                        $tg->index_text($mid, 1, 'XM');
@@ -369,10 +369,12 @@ sub add_message {
                        }
                }
 
+               $self->delete_article($num) if defined $num; # for reindexing
                if ($skel) {
                        push @values, $mids, $xpath, $data;
                        $skel->index_skeleton(\@values);
                        $doc->add_boolean_term('Q' . $_) foreach @$mids;
+                       $doc->add_boolean_term('XNUM' . $num) if defined $num;
                        $doc_id = $self->{xdb}->add_document($doc);
                } else {
                        $doc_id = link_and_save($self, $doc, $mids, $refs,
@@ -421,6 +423,16 @@ sub remove_message {
        }
 }
 
+sub delete_article {
+       my ($self, $num) = @_;
+       my $ndel = 0;
+       batch_do($self, 'XNUM' . $num, sub {
+               my ($ids) = @_;
+               $ndel += scalar @$ids;
+               $self->{xdb}->delete_document($_) for @$ids;
+       });
+}
+
 # MID is a hint in V2
 sub remove_by_oid {
        my ($self, $oid, $mid) = @_;
index 78a17303530e5e23ab47c8c78e9d2ee8b280a462..4f158169185f612bd4fc13bb9f1f5ba162e26f1b 100644 (file)
@@ -134,6 +134,7 @@ sub index_skeleton_real ($$) {
        $smsg->load_from_data($doc_data);
        my $num = $values->[PublicInbox::Search::NUM];
        my @refs = ($smsg->references =~ /<([^>]+)>/g);
+       $self->delete_article($num) if defined $num; # for reindexing
        $self->link_and_save($doc, $mids, \@refs, $num, $xpath);
 }
 
index 46bfebbd3c92fe5369a18ac31ecf91c25a0ce601..550a74d49fd1b9b8e78b3ed275ee6cd21e434bb0 100644 (file)
@@ -359,6 +359,23 @@ sub git_init {
        $git_dir
 }
 
+sub git_dir_latest {
+       my ($self, $max) = @_;
+       $$max = -1;
+       my $pfx = "$self->{-inbox}->{mainrepo}/git";
+       return unless -d $pfx;
+       my $latest;
+       opendir my $dh, $pfx or die "opendir $pfx: $!\n";
+       while (defined(my $git_dir = readdir($dh))) {
+               $git_dir =~ m!\A(\d+)\.git\z! or next;
+               if ($1 > $$max) {
+                       $$max = $1;
+                       $latest = "$pfx/$git_dir";
+               }
+       }
+       $latest;
+}
+
 sub importer {
        my ($self) = @_;
        my $im = $self->{im};
@@ -375,20 +392,9 @@ sub importer {
                        return $self->import_init($git, 0);
                }
        }
-       my $latest;
-       my $max = -1;
        my $new = 0;
-       my $pfx = "$self->{-inbox}->{mainrepo}/git";
-       if (-d $pfx) {
-               foreach my $git_dir (glob("$pfx/*.git")) {
-                       $git_dir =~ m!/(\d+)\.git\z! or next;
-                       my $n = $1;
-                       if ($n > $max) {
-                               $max = $n;
-                               $latest = $git_dir;
-                       }
-               }
-       }
+       my $max;
+       my $latest = git_dir_latest($self, \$max);
        if (defined $latest) {
                my $git = PublicInbox::Git->new($latest);
                my $packed_bytes = $git->packed_bytes;
@@ -466,6 +472,8 @@ sub lookup_content {
 
 sub atfork_child {
        my ($self) = @_;
+       my $fh = delete $self->{reindex_pipe};
+       close $fh if $fh;
        if (my $parts = $self->{idx_parts}) {
                $_->atfork_child foreach @$parts;
        }
@@ -474,4 +482,104 @@ sub atfork_child {
        }
 }
 
+sub mark_deleted {
+       my ($self, $D, $git, $oid) = @_;
+       my $msgref = $git->cat_file($oid);
+       my $mime = PublicInbox::MIME->new($$msgref);
+       my $mids = mids($mime->header_obj);
+       my $cid = content_id($mime);
+       foreach my $mid (@$mids) {
+               $D->{$mid.$cid} = 1;
+       }
+}
+
+sub reindex_oid {
+       my ($self, $mm_tmp, $D, $git, $oid) = @_;
+       my $len;
+       my $msgref = $git->cat_file($oid, \$len);
+       my $mime = PublicInbox::MIME->new($$msgref);
+       my $mids = mids($mime->header_obj);
+       my $cid = content_id($mime);
+
+       # get the NNTP article number we used before, highest number wins
+       # and gets deleted from mm_tmp;
+       my $mid0;
+       my $num = -1;
+       my $del = 0;
+       foreach my $mid (@$mids) {
+               $del += (delete $D->{$mid.$cid} || 0);
+               my $n = $mm_tmp->num_for($mid);
+               if (defined $n && $n > $num) {
+                       $mid0 = $mid;
+                       $num = $n;
+               }
+       }
+       if (!defined($mid0) || $del) {
+               return if (!defined($mid0) && $del); # expected for deletes
+
+               my $id = '<' . join('> <', @$mids) . '>';
+               defined($mid0) or
+                       warn "Skipping $id, no article number found\n";
+               if ($del && defined($mid0)) {
+                       warn "$id was deleted $del " .
+                               "time(s) but mapped to article #$num\n";
+               }
+               return;
+
+       }
+       $mm_tmp->mid_delete($mid0) or
+               die "failed to delete <$mid0> for article #$num\n";
+
+       my $nparts = $self->{partitions};
+       my $part = $num % $nparts;
+       my $idx = $self->idx_part($part);
+       $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
+       my $n = $self->{transact_bytes} += $len;
+       if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
+               $git->cleanup;
+               $mm_tmp->atfork_prepare;
+               $self->done; # release lock
+               # allow -watch or -mda to write...
+               $self->idx_init; # reacquire lock
+               $mm_tmp->atfork_parent;
+       }
+}
+
+sub reindex {
+       my ($self) = @_;
+       my $ibx = $self->{-inbox};
+       my $pfx = "$ibx->{mainrepo}/git";
+       my $max_git;
+       my $latest = git_dir_latest($self, \$max_git);
+       return unless defined $latest;
+       my @cmd = qw(log --raw -r --pretty=tformat:%h
+                       --no-notes --no-color --no-abbrev);
+       my $head = $ibx->{ref_head} || 'refs/heads/master';
+       $self->idx_init; # acquire lock
+       my $x40 = qr/[a-f0-9]{40}/;
+       my $mm_tmp = $self->{skel}->{mm}->tmp_clone;
+       my $D = {};
+
+       # work backwards through history
+       for (my $cur = $max_git; $cur >= 0; $cur--) {
+               die "already reindexing!\n" if delete $self->{reindex_pipe};
+               my $cmt;
+               my $git_dir = "$pfx/$cur.git";
+               my $git = PublicInbox::Git->new($git_dir);
+               my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $head);
+               while (<$fh>) {
+                       if (/\A$x40$/o) {
+                               chomp($cmt = $_);
+                       } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) {
+                               $self->reindex_oid($mm_tmp, $D, $git, $1);
+                       } elsif (m!\A:\d{6} 100644 $x40 ($x40) [AM]\t_/D$!o) {
+                               $self->mark_deleted($D, $git, $1);
+                       }
+               }
+               delete $self->{reindex_pipe};
+       }
+       my ($min, $max) = $mm_tmp->minmax;
+       defined $max and die "leftover article numbers at $min..$max\n";
+}
+
 1;
index 1debbaacc27881fe9020ce1c3fa26d3055f98025..cea357381307fe35b09e628fd185ccd7f1f0c219 100755 (executable)
@@ -31,6 +31,9 @@ my @dirs;
 sub resolve_repo_dir {
        my ($cd) = @_;
        my $prefix = defined $cd ? $cd : './';
+       if (-d $prefix && -f "$prefix/inbox.lock") { # v2
+               return abs_path($prefix);
+       }
 
        my @cmd = qw(git rev-parse --git-dir);
        my $cmd = join(' ', @cmd);
@@ -75,14 +78,26 @@ foreach my $k (keys %$config) {
 }
 
 foreach my $dir (@dirs) {
+       if (!ref($dir) && -f "$dir/inbox.lock") { # v2
+               my $ibx = { mainrepo => $dir, name => 'unnamed' };
+               $dir = PublicInbox::Inbox->new($ibx);
+       }
        index_dir($dir);
 }
 
 sub index_dir {
-       my ($git_dir) = @_;
-       if (!ref $git_dir && ! -d $git_dir) {
-               die "$git_dir does not appear to be a git repository\n";
+       my ($repo) = @_;
+       if (!ref $repo && ! -d $repo) {
+               die "$repo does not appear to be an inbox repository\n";
+       }
+       if (ref($repo) && ($repo->{version} || 1) == 2) {
+               eval { require PublicInbox::V2Writable };
+               die "v2 requirements not met: $@\n" if $@;
+               my $v2w = PublicInbox::V2Writable->new($repo);
+               $v2w->reindex;
+               $v2w->done;
+       } else {
+               my $s = PublicInbox::SearchIdx->new($repo, 1);
+               $s->index_sync({ reindex => $reindex });
        }
-       my $s = PublicInbox::SearchIdx->new($git_dir, 1);
-       $s->index_sync({ reindex => $reindex });
 }