lib/PublicInbox/Search.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/SearchIdxPart.pm
-lib/PublicInbox/SearchIdxThread.pm
+lib/PublicInbox/SearchIdxSkeleton.pm
lib/PublicInbox/SearchMsg.pm
lib/PublicInbox/SearchThread.pm
lib/PublicInbox/SearchView.pm
}
warn "v2 repo with $parts found in $dir\n";
$self->{xdb} = $xdb;
- $self->{skel} = Search::Xapian::Database->new("$dir/all");
+ $self->{skel} = Search::Xapian::Database->new("$dir/skel");
} else {
$self->{xdb} = Search::Xapian::Database->new($self->xdir);
}
my ($doc_id, $old_tid);
my $mid = mid_clean(mid_mime($mime));
- my $threader = $self->{threader};
+ my $skel = $self->{skeleton};
eval {
die 'Message-ID too long' if length($mid) > MAX_MID_SIZE;
# convert a ghost to a regular message
# it will also clobber any existing regular message
$doc_id = $smsg->{doc_id};
- $old_tid = $smsg->thread_id unless $threader;
+ $old_tid = $smsg->thread_id unless $skel;
}
$smsg = PublicInbox::SearchMsg->new($mime);
my $doc = $smsg->{doc};
# populates smsg->references for smsg->to_doc_data
my $refs = parse_references($smsg);
my $data = $smsg->to_doc_data($blob);
- if ($threader) {
+ if ($skel) {
push @values, $mid, $xpath, $data;
- $threader->thread_msg(\@values);
+ $skel->index_skeleton(\@values);
} else {
link_message($self, $smsg, $refs, $old_tid);
}
$_[0]->{lockfh} = undef;
}
-# remote_* subs are only used by SearchIdxPart and SearchIdxThread:
+# remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton
sub remote_commit {
my ($self) = @_;
print { $self->{w} } "commit\n" or die "failed to write commit: $!";
use base qw(PublicInbox::SearchIdx);
sub new {
- my ($class, $v2writable, $part, $threader) = @_;
+ my ($class, $v2writable, $part, $skel) = @_;
my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part);
- $self->{threader} = $threader;
+ $self->{skeleton} = $skel;
my ($r, $w);
pipe($r, $w) or die "pipe failed: $!\n";
binmode $r, ':raw';
# Copyright (C) 2018 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-package PublicInbox::SearchIdxThread;
+package PublicInbox::SearchIdxSkeleton;
use strict;
use warnings;
use base qw(PublicInbox::SearchIdx);
sub new {
my ($class, $v2writable) = @_;
- my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'all');
+ my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'skel');
# create the DB:
$self->_xdb_acquire;
$self->_xdb_release;
$v2writable->atfork_child;
$v2writable = undef;
close $w;
- eval { thread_worker_loop($self, $r) };
- die "thread worker died: $@\n" if $@;
+ eval { skeleton_worker_loop($self, $r) };
+ die "skeleton worker died: $@\n" if $@;
exit;
}
$self->{w} = $w;
$w->autoflush(1);
# lock on only exists in parent, not in worker
- my $l = $self->{lock_path} = $self->xdir . '/thread.lock';
+ my $l = $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
open my $fh, '>>', $l or die "failed to create $l: $!\n";
$self;
}
-sub thread_worker_loop {
+sub skeleton_worker_loop {
my ($self, $r) = @_;
- $0 = 'pi-v2-threader';
+ $0 = 'pi-v2-skeleton';
my $msg;
my $xdb = $self->_xdb_acquire;
$xdb->begin_transaction;
$xdb->begin_transaction;
$txn = 1;
}
- eval { $self->thread_msg_real($msg) };
+ eval { index_skeleton_real($self, $msg) };
warn "failed to index message <$msg->[-1]>: $@\n" if $@;
}
}
}
# called by a partition worker
-sub thread_msg {
+sub index_skeleton {
my ($self, $values) = @_;
my $w = $self->{w};
my $err;
die "print failed: $err\n" if $err;
}
-sub thread_msg_real {
+# values: [ TS, NUM, BYTES, LINES, MID, XPATH, doc_data ]
+sub index_skeleton_real ($$) {
my ($self, $values) = @_;
my $doc_data = pop @$values;
my $xpath = pop @$values;
use warnings;
use Fcntl qw(:flock :DEFAULT);
use PublicInbox::SearchIdxPart;
-use PublicInbox::SearchIdxThread;
+use PublicInbox::SearchIdxSkeleton;
use PublicInbox::MIME;
use PublicInbox::Git;
use PublicInbox::Import;
my ($len, $msgref) = @{$im->{last_object}};
$self->idx_init;
- my $num = $self->{all}->index_mm($mime, 1);
+ my $num = $self->{skel}->index_mm($mime, 1);
my $nparts = $self->{partitions};
my $part = $num % $nparts;
my $idx = $self->idx_part($part);
my ($self) = @_;
return if $self->{idx_parts};
- # first time initialization, first we create the threader pipe:
- my $all = $self->{all} = PublicInbox::SearchIdxThread->new($self);
+ # first time initialization, first we create the skeleton pipe:
+ my $skel = $self->{skel} = PublicInbox::SearchIdxSkeleton->new($self);
# need to create all parts before initializing msgmap FD
my $max = $self->{partitions} - 1;
my $idx = $self->{idx_parts} = [];
for my $i (0..$max) {
- push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
+ push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $skel);
}
# Now that all subprocesses are up, we can open the FD for SQLite:
- $all->_msgmap_init->{dbh}->begin_work;
+ $skel->_msgmap_init->{dbh}->begin_work;
}
sub remove {
sub searchidx_checkpoint {
my ($self, $more) = @_;
- # order matters, we can only close {all} after all partitions
- # are done because the partitions also write to {all}
+ # order matters, we can only close {skel} after all partitions
+ # are done because the partitions also write to {skel}
if (my $parts = $self->{idx_parts}) {
foreach my $idx (@$parts) {
delete $self->{idx_parts} unless $more;
}
- if (my $all = $self->{all}) {
- $all->{mm}->{dbh}->commit;
+ if (my $skel = $self->{skel}) {
+ $skel->{mm}->{dbh}->commit;
if ($more) {
- $all->{mm}->{dbh}->begin_work;
+ $skel->{mm}->{dbh}->begin_work;
+ }
+ $skel->remote_commit;
+ unless ($more) {
+ $skel->remote_close;
+ delete $self->{skel};
}
- $all->remote_commit;
- $all->remote_close unless $more;
- delete $self->{all} unless $more;
}
$self->{transact_bytes} = 0;
}