lib/PublicInbox/NNTP.pm
lib/PublicInbox/NNTPD.pm
lib/PublicInbox/NewsWWW.pm
+lib/PublicInbox/Over.pm
+lib/PublicInbox/OverIdx.pm
+lib/PublicInbox/OverIdxFork.pm
lib/PublicInbox/ParentPipe.pm
lib/PublicInbox/ProcessPipe.pm
lib/PublicInbox/Qspawn.pm
lib/PublicInbox/Search.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/SearchIdxPart.pm
-lib/PublicInbox/SearchIdxSkeleton.pm
lib/PublicInbox/SearchMsg.pm
lib/PublicInbox/SearchThread.pm
lib/PublicInbox/SearchView.pm
t/msgmap.t
t/nntp.t
t/nntpd.t
+t/over.t
t/plack.t
t/precheck.t
t/psgi_attach.t
sub recent {
my ($self, $opts) = @_;
- my $qs = '';
- my $srch = search($self);
- if (!$opts->{offset}) {
- # this complicated bit cuts /$INBOX/ loading time by
- # over 400ms on my system:
- my ($min, $max) = mm($self)->minmax;
- my $n = $max - $opts->{limit};
- $n = $min if $n < $min;
- for (; $qs eq '' && $n >= $min; --$n) {
- my $smsg = $srch->lookup_article($n) or next;
- $qs = strftime('d:%Y%m%d..', gmtime($smsg->ts));
- }
- }
- $srch->query($qs, $opts);
+ search($self)->query('', $opts);
}
1;
sub new_file {
my ($class, $f, $writable) = @_;
+ return if !$writable && !-r $f;
my $dbh = dbh_new($f, $writable);
my $self = bless { dbh => $dbh }, $class;
};
return '.' unless @srch;
- $ts .= '..';
- my $opts = { asc => 1, limit => 1000, offset => 0 };
+ my $opts = { limit => 1000, offset => 0 };
long_response($self, 0, long_response_limit, sub {
my ($i) = @_;
my $srch = $srch[0];
- my $res = $srch->query_ts($ts, $opts);
- my $msgs = $res->{msgs};
+ my $msgs = $srch->query_ts($ts, $opts);
if (my $nr = scalar @$msgs) {
more($self, '<' .
join(">\r\n<", map { $_->mid } @$msgs ).
defined $mid or return $err;
}
found:
- my $smsg = $ng->search->lookup_article($n) or return $err;
+ my $smsg = $ng->search->{over_ro}->get_art($n) or return $err;
my $msg = $ng->msg_by_smsg($smsg) or return $err;
my $s = Email::Simple->new($msg);
if ($set_headers) {
sub search_header_for {
my ($srch, $num, $field) = @_;
- my $smsg = $srch->lookup_article($num) or return;
- $smsg->$field;
+ my $smsg = $srch->{over_ro}->get_art($num) or return;
+ return PublicInbox::SearchMsg::date($smsg) if $field eq 'date';
+ $smsg->{$field};
}
sub hdr_searchmsg ($$$$) {
my $off = 0;
long_response($self, $beg, $end, sub {
my ($i) = @_;
- my $res = $srch->query_xover($beg, $end, $off);
- my $msgs = $res->{msgs};
+ my $msgs = $srch->query_xover($beg, $end, $off);
my $nr = scalar @$msgs or return;
$off += $nr;
my $tmp = '';
$smsg->{subject},
$smsg->{from},
PublicInbox::SearchMsg::date($smsg),
- '<'.PublicInbox::SearchMsg::mid($smsg).'>',
+ "<$smsg->{mid}>",
$smsg->{references},
- PublicInbox::SearchMsg::bytes($smsg),
- PublicInbox::SearchMsg::lines($smsg));
+ $smsg->{bytes},
+ $smsg->{lines});
utf8::encode($s);
$s
}
if ($range && $range =~ /\A<(.+)>\z/) {
my ($ng, $n) = mid_lookup($self, $1);
defined $n or return r430;
- my $smsg = $ng->search->lookup_article($n) or return r430;
+ my $smsg = $ng->search->{over_ro}->get_art($n) or return r430;
more($self, '224 Overview information follows (multi-line)');
# Only set article number column if it's the current group
my $off = 0;
long_response($self, $beg, $end, sub {
my ($i) = @_;
- my $res = $srch->query_xover($beg, $end, $off);
- my $msgs = $res->{msgs};
+ my $msgs = $srch->query_xover($beg, $end, $off);
my $nr = scalar @$msgs or return;
$off += $nr;
# OVERVIEW.FMT
more($self, join("\r\n", map {
- over_line(PublicInbox::SearchMsg::num($_), $_);
+ over_line($_->{num}, $_);
} @$msgs));
# -1 to adjust for implicit increment in long_response
--- /dev/null
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
+# Unlike Msgmap, this is an _UNSTABLE_ database which can be
+# tweaked/updated over time and rebuilt.
+package PublicInbox::Over;
+use strict;
+use warnings;
+use DBI;
+use DBD::SQLite;
+use PublicInbox::SearchMsg;
+use Compress::Zlib qw(uncompress);
+
+sub dbh_new {
+ my ($self) = @_;
+ my $ro = ref($self) eq 'PublicInbox::Over';
+ my $dbh = DBI->connect("dbi:SQLite:dbname=$self->{filename}",'','', {
+ AutoCommit => 1,
+ RaiseError => 1,
+ PrintError => 0,
+ ReadOnly => $ro,
+ sqlite_use_immediate_transaction => 1,
+ });
+ $dbh->{sqlite_unicode} = 1;
+ $dbh;
+}
+
+sub new {
+ my ($class, $f) = @_;
+ bless { filename => $f }, $class;
+}
+
+sub disconnect { $_[0]->{dbh} = undef }
+
+sub connect { $_[0]->{dbh} ||= $_[0]->dbh_new }
+
+sub load_from_row {
+ my ($smsg) = @_;
+ bless $smsg, 'PublicInbox::SearchMsg';
+ if (defined(my $data = delete $smsg->{ddd})) {
+ $data = uncompress($data);
+ utf8::decode($data);
+ $smsg->load_from_data($data);
+ }
+ $smsg
+}
+
+sub do_get {
+ my ($self, $sql, $opts, @args) = @_;
+ my $dbh = $self->connect;
+ my $lim = (($opts->{limit} || 0) + 0) || 1000;
+ my $off = (($opts->{offset} || 0) + 0) || 0;
+ $sql .= "LIMIT $lim OFFSET $off";
+ my $msgs = $dbh->selectall_arrayref($sql, { Slice => {} }, @args);
+ load_from_row($_) for @$msgs;
+ $msgs
+}
+
+sub query_xover {
+ my ($self, $beg, $end, $off) = @_;
+ do_get($self, <<'', { offset => $off }, $beg, $end);
+SELECT * FROM over WHERE num >= ? AND num <= ?
+ORDER BY num ASC
+
+}
+
+sub query_ts {
+ my ($self, $ts, $opts) = @_;
+ do_get($self, <<'', $opts, $ts);
+SELECT * FROM over WHERE num > 0 AND ts >= ?
+ORDER BY ts ASC
+
+}
+
+sub get_thread {
+ my ($self, $mid, $opts) = @_;
+ my $dbh = $self->connect;
+ my ($tid, $sid) = $dbh->selectrow_array(<<'', undef, $mid);
+SELECT tid,sid FROM over
+LEFT JOIN id2num ON over.num = id2num.num
+LEFT JOIN msgid ON id2num.id = msgid.id
+WHERE msgid.mid = ? AND over.num > 0
+LIMIT 1
+
+ my $cond = 'FROM over WHERE (tid = ? OR sid = ?) AND num > 0';
+ my $msgs = do_get($self, <<"", $opts, $tid, $sid);
+SELECT * $cond
+ORDER BY ts ASC
+
+ my $nr = $dbh->selectrow_array(<<"", undef, $tid, $sid);
+SELECT COUNT(num) $cond
+
+ { total => $nr, msgs => $msgs };
+}
+
+sub recent {
+ my ($self, $opts) = @_;
+ my $msgs = do_get($self, <<'', $opts);
+SELECT * FROM over WHERE num > 0
+ORDER BY ts DESC
+
+ my $nr = $self->{dbh}->selectrow_array(<<'');
+SELECT COUNT(num) FROM over WHERE num > 0
+
+ { total => $nr, msgs => $msgs };
+}
+
+sub get_art {
+ my ($self, $num) = @_;
+ my $dbh = $self->connect;
+ my $smsg = $dbh->selectrow_hashref(<<'', undef, $num);
+SELECT * from OVER where num = ? LIMIT 1
+
+ return load_from_row($smsg) if $smsg;
+ undef;
+}
+
+1;
--- /dev/null
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
+# Unlike Msgmap, this is an _UNSTABLE_ database which can be
+# tweaked/updated over time and rebuilt.
+package PublicInbox::OverIdx;
+use strict;
+use warnings;
+use base qw(PublicInbox::Over);
+use IO::Handle;
+use DBI qw(:sql_types); # SQL_BLOB
+
+sub dbh_new {
+ my ($self) = @_;
+ my $dbh = $self->SUPER::dbh_new;
+ $dbh->do('PRAGMA synchronous = OFF'); # commit_fsync instead
+ $dbh->do('PRAGMA journal_mode = TRUNCATE');
+ $dbh->do('PRAGMA cache_size = 80000');
+ create_tables($dbh);
+ $dbh;
+}
+
+sub commit_fsync {
+ my $fn = $_[0]->{filename};
+ if (open my $fh, '+<', $fn) {
+ $fh->sync;
+ close $fh;
+ }
+}
+
+sub get_counter ($$) {
+ my ($dbh, $key) = @_;
+ my $sth = $dbh->prepare_cached(<<'', undef, 1);
+SELECT val FROM counter WHERE key = ? LIMIT 1
+
+ $sth->execute($key);
+ $sth->fetchrow_array;
+}
+
+sub adj_counter ($$$) {
+ my ($self, $key, $op) = @_;
+ my $dbh = $self->{dbh};
+ my $sth = $dbh->prepare_cached(<<"");
+UPDATE counter SET val = val $op 1 WHERE key = ?
+
+ $sth->execute($key);
+
+ get_counter($dbh, $key);
+}
+
+sub next_tid { adj_counter($_[0], 'thread', '+') }
+sub next_ghost_num { adj_counter($_[0], 'ghost', '-') }
+
+sub id_for ($$$$$) {
+ my ($self, $tbl, $id_col, $val_col, $val) = @_;
+ my $dbh = $self->{dbh};
+ my $in = $dbh->prepare_cached(<<"")->execute($val);
+INSERT OR IGNORE INTO $tbl ($val_col) VALUES (?)
+
+ if ($in == 0) {
+ my $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT $id_col FROM $tbl WHERE $val_col = ? LIMIT 1
+
+ $sth->execute($val);
+ $sth->fetchrow_array;
+ } else {
+ $dbh->last_insert_id(undef, undef, $tbl, $id_col);
+ }
+}
+
+sub sid {
+ my ($self, $path) = @_;
+ return unless defined $path && $path ne '';
+ id_for($self, 'subject', 'sid', 'path' => $path);
+}
+
+sub mid2id {
+ my ($self, $mid) = @_;
+ id_for($self, 'msgid', 'id', 'mid' => $mid);
+}
+
+sub delete_by_num {
+ my ($self, $num) = @_;
+ my $dbh = $self->{dbh};
+ foreach (qw(over id2num)) {
+ $dbh->prepare_cached(<<"")->execute($num);
+DELETE FROM $_ WHERE num = ?
+
+ }
+}
+
+# this includes ghosts
+sub each_by_mid {
+ my ($self, $mid, $cols, $cb) = @_;
+ my $dbh = $self->{dbh};
+
+=over
+ I originally wanted to stuff everything into a single query:
+
+ SELECT over.* FROM over
+ LEFT JOIN id2num ON over.num = id2num.num
+ LEFT JOIN msgid ON msgid.id = id2num.id
+ WHERE msgid.mid = ? AND over.num >= ?
+ ORDER BY over.num ASC
+ LIMIT 1000
+
+ But it's faster broken out (and we're always in a
+ transaction for subroutines in this file)
+=cut
+
+ my $sth = $dbh->prepare_cached(<<'', undef, 1);
+SELECT id FROM msgid WHERE mid = ? LIMIT 1
+
+ $sth->execute($mid);
+ my $id = $sth->fetchrow_array;
+ defined $id or return;
+
+ push(@$cols, 'num');
+ $cols = join(',', map { $_ } @$cols);
+ my $lim = 10;
+ my $prev = get_counter($dbh, 'ghost');
+ while (1) {
+ $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT num FROM id2num WHERE id = ? AND num >= ?
+ORDER BY num ASC
+LIMIT $lim
+
+ $sth->execute($id, $prev);
+ my $nums = $sth->fetchall_arrayref;
+ my $nr = scalar(@$nums) or return;
+ $prev = $nums->[-1]->[0];
+
+ $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT $cols FROM over WHERE over.num = ? LIMIT 1
+
+ foreach (@$nums) {
+ $sth->execute($_->[0]);
+ my $smsg = $sth->fetchrow_hashref;
+ $cb->(PublicInbox::Over::load_from_row($smsg)) or
+ return;
+ }
+ return if $nr != $lim;
+ }
+}
+
+# this will create a ghost as necessary
+sub resolve_mid_to_tid {
+ my ($self, $mid) = @_;
+ my $tid;
+ each_by_mid($self, $mid, ['tid'], sub {
+ my ($smsg) = @_;
+ my $cur_tid = $smsg->{tid};
+ if (defined $tid) {
+ merge_threads($self, $tid, $cur_tid);
+ } else {
+ $tid = $cur_tid;
+ }
+ 1;
+ });
+ defined $tid ? $tid : create_ghost($self, $mid);
+}
+
+sub create_ghost {
+ my ($self, $mid) = @_;
+ my $id = $self->mid2id($mid);
+ my $num = $self->next_ghost_num;
+ $num < 0 or die "ghost num is non-negative: $num\n";
+ my $tid = $self->next_tid;
+ my $dbh = $self->{dbh};
+ $dbh->prepare_cached(<<'')->execute($num, $tid);
+INSERT INTO over (num, tid) VALUES (?,?)
+
+ $dbh->prepare_cached(<<'')->execute($id, $num);
+INSERT INTO id2num (id, num) VALUES (?,?)
+
+ $tid;
+}
+
+sub merge_threads {
+ my ($self, $winner_tid, $loser_tid) = @_;
+ return if $winner_tid == $loser_tid;
+ my $dbh = $self->{dbh};
+ $dbh->prepare_cached(<<'')->execute($winner_tid, $loser_tid);
+UPDATE over SET tid = ? WHERE tid = ?
+
+}
+
+sub link_refs {
+ my ($self, $refs, $old_tid) = @_;
+ my $tid;
+
+ if (@$refs) {
+ # first ref *should* be the thread root,
+ # but we can never trust clients to do the right thing
+ my $ref = $refs->[0];
+ $tid = resolve_mid_to_tid($self, $ref);
+ merge_threads($self, $tid, $old_tid) if defined $old_tid;
+
+ # the rest of the refs should point to this tid:
+ foreach my $i (1..$#$refs) {
+ $ref = $refs->[$i];
+ my $ptid = resolve_mid_to_tid($self, $ref);
+ merge_threads($self, $tid, $ptid);
+ }
+ } else {
+ $tid = defined $old_tid ? $old_tid : $self->next_tid;
+ }
+ $tid;
+}
+
+sub add_over {
+ my ($self, $values) = @_;
+ my ($ts, $num, $mids, $refs, $xpath, $ddd) = @$values;
+ my $old_tid;
+ my $vivified = 0;
+
+ $self->begin_lazy;
+ $self->delete_by_num($num);
+ foreach my $mid (@$mids) {
+ my $v = 0;
+ each_by_mid($self, $mid, ['tid'], sub {
+ my ($cur) = @_;
+ my $cur_tid = $cur->{tid};
+ my $n = $cur->{num};
+ die "num must not be zero for $mid" if !$n;
+ $old_tid = $cur_tid unless defined $old_tid;
+ if ($n > 0) { # regular mail
+ merge_threads($self, $old_tid, $cur_tid);
+ } elsif ($n < 0) { # ghost
+ link_refs($self, $refs, $old_tid);
+ $self->delete_by_num($n);
+ $v++;
+ }
+ 1;
+ });
+ $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n";
+ $vivified += $v;
+ }
+ my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid);
+ my $sid = $self->sid($xpath);
+ my $dbh = $self->{dbh};
+ my $sth = $dbh->prepare_cached(<<'');
+INSERT INTO over (num, tid, sid, ts, ddd)
+VALUES (?,?,?,?,?)
+
+ my $n = 0;
+ my @v = ($num, $tid, $sid, $ts);
+ foreach (@v) { $sth->bind_param(++$n, $_) }
+ $sth->bind_param(++$n, $ddd, SQL_BLOB);
+ $sth->execute;
+ $sth = $dbh->prepare_cached(<<'');
+INSERT INTO id2num (id, num) VALUES (?,?)
+
+ foreach my $mid (@$mids) {
+ my $id = $self->mid2id($mid);
+ $sth->execute($id, $num);
+ }
+}
+
+sub delete_articles {
+ my ($self, $nums) = @_;
+ my $dbh = $self->connect;
+ $self->delete_by_num($_) foreach @$nums;
+}
+
+sub remove_oid {
+ my ($self, $oid, $mid) = @_;
+ $self->begin_lazy;
+ each_by_mid($self, $mid, ['ddd'], sub {
+ my ($smsg) = @_;
+ $self->delete_by_num($smsg->{num}) if $smsg->{blob} eq $oid;
+ 1;
+ });
+}
+
+sub create_tables {
+ my ($dbh) = @_;
+
+ $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS over (
+ num INTEGER NOT NULL,
+ tid INTEGER NOT NULL,
+ sid INTEGER,
+ ts INTEGER,
+ ddd VARBINARY, /* doc-data-deflated */
+ UNIQUE (num)
+)
+
+ $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)');
+ $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)');
+ $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)');
+
+ $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS counter (
+ key VARCHAR(8) PRIMARY KEY NOT NULL,
+ val INTEGER DEFAULT 0,
+ UNIQUE (key)
+)
+
+ $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('thread')");
+ $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('ghost')");
+
+ $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS subject (
+ sid INTEGER PRIMARY KEY AUTOINCREMENT,
+ path VARCHAR(40) NOT NULL,
+ UNIQUE (path)
+)
+
+ $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS id2num (
+ id INTEGER NOT NULL,
+ num INTEGER NOT NULL,
+ UNIQUE (id, num)
+)
+
+ # performance critical:
+ $dbh->do('CREATE INDEX IF NOT EXISTS idx_inum ON id2num (num)');
+ $dbh->do('CREATE INDEX IF NOT EXISTS idx_id ON id2num (id)');
+
+ $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS msgid (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ mid VARCHAR(244) NOT NULL,
+ UNIQUE (mid)
+)
+
+}
+
+sub commit_lazy {
+ my ($self) = @_;
+ delete $self->{txn} or return;
+ $self->{dbh}->commit;
+}
+
+sub begin_lazy {
+ my ($self) = @_;
+ return if $self->{txn};
+ my $dbh = $self->connect or return;
+ $dbh->begin_work;
+ # $dbh->{Profile} = 2;
+ $self->{txn} = 1;
+}
+
+sub rollback_lazy {
+ my ($self) = @_;
+ delete $self->{txn} or return;
+ $self->{dbh}->rollback;
+}
+
+sub disconnect {
+ my ($self) = @_;
+ die "in transaction" if $self->{txn};
+ $self->{dbh} = undef;
+}
+
+sub create {
+ my ($self) = @_;
+ unless (-r $self->{filename}) {
+ require File::Path;
+ require File::Basename;
+ File::Path::mkpath(File::Basename::dirname($self->{filename}));
+ }
+ # create the DB:
+ PublicInbox::Over::connect($self);
+ $self->disconnect;
+}
+
+1;
# Copyright (C) 2018 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-package PublicInbox::SearchIdxSkeleton;
+package PublicInbox::OverIdxFork;
use strict;
use warnings;
-use base qw(PublicInbox::SearchIdx);
+use base qw(PublicInbox::OverIdx PublicInbox::Lock);
use Storable qw(freeze thaw);
+use IO::Handle;
-sub new {
- my ($class, $v2writable) = @_;
- my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'skel');
- # create the DB:
- $self->_xdb_acquire;
- $self->_xdb_release;
+sub create {
+ my ($self, $v2writable) = @_;
+ $self->SUPER::create();
$self->spawn_worker($v2writable) if $v2writable->{parallel};
- $self
}
sub spawn_worker {
$v2writable = undef;
close $w;
close $barrier_wait;
- eval { skeleton_worker_loop($self, $r, $barrier_note) };
- die "skeleton worker died: $@\n" if $@;
+
+ # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
+ # speeds V2Writable batch imports across 8 cores by nearly 20%
+ fcntl($r, 1031, 1048576) if $^O eq 'linux';
+
+ eval { over_worker_loop($self, $r, $barrier_note) };
+ die "over worker died: $@\n" if $@;
exit;
}
$self->{w} = $w;
$self->{pid} = $pid;
+ $self->{lock_path} = "$self->{filename}.pipe.lock";
close $r;
close $barrier_note;
$self->{barrier_wait} = $barrier_wait;
-
$w->autoflush(1);
-
- # lock on only exists in parent, not in worker
- $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
}
-sub skeleton_worker_loop {
+sub over_worker_loop {
my ($self, $r, $barrier_note) = @_;
$barrier_note->autoflush(1);
- $0 = 'pi-v2-skeleton';
- $self->begin_txn_lazy;
+ $0 = 'pi-v2-overview';
+ $self->begin_lazy;
my $barrier = undef;
while (my $line = $r->getline) {
if ($line eq "commit\n") {
- $self->commit_txn_lazy;
+ $self->commit_lazy;
} elsif ($line eq "close\n") {
- $self->_xdb_release;
+ $self->disconnect;
} elsif ($line =~ /\Abarrier_init (\d+)\n\z/) {
my $n = $1 - 1;
die "barrier in-progress\n" if defined $barrier;
delete $barrier->{$1} or die "unknown barrier: $part\n";
if ((scalar keys %$barrier) == 0) {
$barrier = undef;
- $self->commit_txn_lazy;
+ $self->commit_lazy;
print $barrier_note "barrier_done\n" or die
"print failed to barrier note: $!";
}
} elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) {
my ($oid, $mid) = ($1, $2);
- $self->begin_txn_lazy;
- $self->remove_by_oid($oid, $mid);
+ $self->remove_oid($oid, $mid);
} else {
my $len = int($line);
my $n = read($r, my $msg, $len) or die "read: $!\n";
$n == $len or die "short read: $n != $len\n";
$msg = thaw($msg); # should raise on error
defined $msg or die "failed to thaw buffer\n";
- $self->begin_txn_lazy;
- eval { index_skeleton_real($self, $msg) };
+ eval { add_over($self, $msg) };
warn "failed to index message <$msg->[-1]>: $@\n" if $@;
}
}
- $self->worker_done;
+ die "$$ $0 dbh not released\n" if $self->{dbh};
+ die "$$ $0 still in transaction\n" if $self->{txn};
}
# called by a partition worker
-sub index_skeleton {
+# values: [ DS, NUM, BYTES, LINES, TS, MIDS, XPATH, doc_data ]
+sub add_over {
my ($self, $values) = @_;
if (my $w = $self->{w}) {
my $err;
die "print failed: $err\n" if $err;
} else {
- $self->begin_txn_lazy;
- index_skeleton_real($self, $values);
+ $self->SUPER::add_over($values);
}
}
-sub remote_remove {
+sub remove_oid {
my ($self, $oid, $mid) = @_;
- my $err;
- $self->lock_acquire;
- eval { $self->SUPER::remote_remove($oid, $mid) };
- $err = $@;
- $self->lock_release;
- die $err if $err;
-}
-
-sub index_skeleton_real ($$) {
- my ($self, $values) = @_;
- my ($ts, $num, $mids, $xpath, $doc_data) = @$values;
- my $smsg = PublicInbox::SearchMsg->new(undef);
- $smsg->load_from_data($doc_data);
- my $doc = $smsg->{doc};
- $doc->set_data($doc_data);
- PublicInbox::SearchIdx::add_values($doc, $ts, $smsg->ds, $num);
- my @refs = ($smsg->references =~ /<([^>]+)>/g);
- $self->delete_article($num) if defined $num; # for reindexing
- $self->link_and_save($doc, $mids, \@refs, $num, $xpath);
+ if (my $w = $self->{w}) {
+ my $err;
+ $self->lock_acquire;
+ print $w "D $oid $mid\n" or $err = $!;
+ $self->lock_release;
+ die $err if $err;
+ } else {
+ $self->SUPER::remove_oid($oid, $mid); # OverIdx
+ }
}
# write to the subprocess
my $w = $self->{w} or return;
my $err;
$self->lock_acquire;
- print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n";
+ print $w "barrier_init $nparts\n" or $err = $!;
$self->lock_release;
die $err if $err;
}
$l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n";
}
+sub remote_commit {
+ my ($self) = @_;
+ if (my $w = $self->{w}) {
+ my $err;
+ $self->lock_acquire;
+ print $w "commit\n" or $err = $!;
+ $self->lock_release;
+ die $err if $err;
+ } else {
+ $self->commit_lazy;
+ }
+}
+
+# prevent connections when using forked subprocesses
+sub connect {
+ my ($self) = @_;
+ return if $self->{w};
+ $self->SUPER::connect;
+}
+
+sub remote_close {
+ my ($self) = @_;
+ if (my $w = delete $self->{w}) {
+ my $pid = delete $self->{pid} or die "no process to wait on\n";
+ print $w "close\n" or die "failed to write to pid:$pid: $!\n";
+ close $w or die "failed to close pipe for pid:$pid: $!\n";
+ waitpid($pid, 0) == $pid or die "remote process did not finish";
+ $? == 0 or die ref($self)." pid:$pid exited with: $?";
+ } else {
+ die "transaction in progress $self\n" if $self->{txn};
+ $self->disconnect;
+ }
+}
+
+sub commit_fsync {
+ my ($self) = @_;
+ return if $self->{w}; # don't bother; main parent can also call this
+ $self->SUPER::commit_fsync;
+}
+
1;
# values for searching
use constant TS => 0; # Received: header in Unix time
use constant YYYYMMDD => 1; # for searching in the WWW UI
-use constant NUM => 2; # NNTP article number
use Search::Xapian qw/:standard/;
use PublicInbox::SearchMsg;
use PublicInbox::MIME;
use PublicInbox::MID qw/id_compress/;
+use PublicInbox::Over;
# This is English-only, everything else is non-standard and may be confused as
# a prefix common in patch emails
# 13 - fix threading for empty References/In-Reply-To
# (commit 83425ef12e4b65cdcecd11ddcb38175d4a91d5a0)
# 14 - fix ghost root vivification
- SCHEMA_VERSION => 14,
+ SCHEMA_VERSION => 15,
# n.b. FLAG_PURE_NOT is expensive not suitable for a public website
# as it could become a denial-of-service vector
QP_FLAGS => FLAG_PHRASE|FLAG_BOOLEAN|FLAG_LOVEHATE|FLAG_WILDCARD,
};
-# setup prefixes
-my %bool_pfx_internal = (
- type => 'T', # "mail" or "ghost"
- thread => 'G', # newsGroup (or similar entity - e.g. a web forum name)
-);
-
my %bool_pfx_external = (
mid => 'Q', # Message-ID (full/exact), this is mostly uniQue
);
);
chomp @HELP;
-my $mail_query = Search::Xapian::Query->new('T' . 'mail');
-
sub xdir {
my ($self) = @_;
if ($self->{version} == 1) {
altid => $altid,
version => $version,
}, $class;
+ my $dir;
if ($version >= 2) {
- my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
+ $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
my $xdb;
my $parts = 0;
foreach my $part (<$dir/*>) {
}
}
$self->{xdb} = $xdb;
- $self->{skel} = Search::Xapian::Database->new("$dir/skel");
} else {
- $self->{xdb} = Search::Xapian::Database->new($self->xdir);
+ $dir = $self->xdir;
+ $self->{xdb} = Search::Xapian::Database->new($dir);
}
+ $self->{over_ro} = PublicInbox::Over->new("$dir/over.sqlite3");
$self;
}
sub reopen {
my ($self) = @_;
$self->{xdb}->reopen;
- if (my $skel = $self->{skel}) {
- $skel->reopen;
- }
$self; # make chaining easier
}
# read-only
sub query {
my ($self, $query_string, $opts) = @_;
- my $query;
-
$opts ||= {};
- unless ($query_string eq '') {
- $query = $self->qp->parse_query($query_string, QP_FLAGS);
+ if ($query_string eq '' && !$opts->{mset}) {
+ $self->{over_ro}->recent($opts);
+ } else {
+ my $query = $self->qp->parse_query($query_string, QP_FLAGS);
$opts->{relevance} = 1 unless exists $opts->{relevance};
+ _do_enquire($self, $query, $opts);
}
-
- _do_enquire($self, $query, $opts);
}
sub get_thread {
my ($self, $mid, $opts) = @_;
- my $smsg = first_smsg_by_mid($self, $mid) or
- return { total => 0, msgs => [] };
- my $qtid = Search::Xapian::Query->new('G' . $smsg->thread_id);
- my $path = $smsg->path;
- if (defined $path && $path ne '') {
- my $path = id_compress($smsg->path);
- my $qsub = Search::Xapian::Query->new('XPATH' . $path);
- $qtid = Search::Xapian::Query->new(OP_OR, $qtid, $qsub);
- }
- $opts ||= {};
- $opts->{limit} ||= 1000;
-
- # always sort threads by timestamp, this makes life easier
- # for the threading algorithm (in SearchThread.pm)
- $opts->{asc} = 1;
- $opts->{enquire} = enquire_skel($self);
- _do_enquire($self, $qtid, $opts);
+ $self->{over_ro}->get_thread($mid, $opts);
}
sub retry_reopen {
sub _enquire_once {
my ($self, $query, $opts) = @_;
- my $enquire = $opts->{enquire} || enquire($self);
- if (defined $query) {
- $query = Search::Xapian::Query->new(OP_AND,$query,$mail_query);
- } else {
- $query = $mail_query;
- }
+ my $enquire = enquire($self);
+ $query = Search::Xapian::Query->new(OP_AND,$query);
$enquire->set_query($query);
$opts ||= {};
my $desc = !$opts->{asc};
if ($opts->{relevance}) {
$enquire->set_sort_by_relevance_then_value(TS, $desc);
- } elsif ($opts->{num}) {
- $enquire->set_sort_by_value(NUM, 0);
} else {
$enquire->set_sort_by_value_then_relevance(TS, $desc);
}
$self->{query_parser} = $qp;
}
-sub num_range_processor {
- $_[0]->{nrp} ||= Search::Xapian::NumberValueRangeProcessor->new(NUM);
-}
-
# only used for NNTP server
sub query_xover {
my ($self, $beg, $end, $offset) = @_;
- my $qp = Search::Xapian::QueryParser->new;
- $qp->set_database($self->{skel} || $self->{xdb});
- $qp->add_valuerangeprocessor($self->num_range_processor);
- my $query = $qp->parse_query("$beg..$end", QP_FLAGS);
-
- my $opts = {
- enquire => enquire_skel($self),
- num => 1,
- limit => 200,
- offset => $offset,
- };
- _do_enquire($self, $query, $opts);
+ $self->{over_ro}->query_xover($beg, $end, $offset);
}
sub query_ts {
- my ($self, $ts, $opts) = @_;
- my $qp = $self->{qp_ts} ||= eval {
- my $q = Search::Xapian::QueryParser->new;
- $q->set_database($self->{skel} || $self->{xdb});
- $q->add_valuerangeprocessor(
- Search::Xapian::NumberValueRangeProcessor->new(TS));
- $q
- };
- my $query = $qp->parse_query($ts, QP_FLAGS);
- $opts->{enquire} = enquire_skel($self);
- _do_enquire($self, $query, $opts);
+ my ($self, $ts, $offset) = @_;
+ $self->{over_ro}->query_ts($ts, $offset);
}
sub first_smsg_by_mid {
sub lookup_article {
my ($self, $num) = @_;
my $term = 'XNUM'.$num;
- my $db = $self->{skel} || $self->{xdb};
+ my $db = $self->{xdb};
retry_reopen($self, sub {
my $head = $db->postlist_begin($term);
my $tail = $db->postlist_end($term);
return unless defined $doc_id;
$head->inc;
if ($head->nequal($tail)) {
- my $loc= $self->{mainrepo} .
- ($self->{skel} ? 'skel' : 'xdb');
- warn "article #$num is not unique in $loc\n";
+ warn "article #$num is not unique\n";
}
# raises on error:
my $doc = $db->get_document($doc_id);
my ($self, $mid, $cb) = @_;
# XXX retry_reopen isn't necessary for V2Writable, but the PSGI
# interface will need it...
- my $db = $self->{skel} || $self->{xdb};
+ my $db = $self->{xdb};
my $term = 'Q' . $mid;
my $head = $db->postlist_begin($term);
my $tail = $db->postlist_end($term);
$self->{enquire} ||= Search::Xapian::Enquire->new($self->{xdb});
}
-sub enquire_skel {
- my ($self) = @_;
- if (my $skel = $self->{skel}) {
- $self->{enquire_skel} ||= Search::Xapian::Enquire->new($skel);
- } else {
- enquire($self);
- }
-}
-
sub help {
my ($self) = @_;
$self->qp; # parse altids
use PublicInbox::MsgIter;
use Carp qw(croak);
use POSIX qw(strftime);
+use PublicInbox::OverIdx;
require PublicInbox::Git;
+use Compress::Zlib qw(compress);
use constant {
- BATCH_BYTES => 1_000_000,
+ BATCH_BYTES => 10_000_000,
DEBUG => !!$ENV{DEBUG},
};
$ibx->umask_prepare;
if ($version == 1) {
$self->{lock_path} = "$mainrepo/ssoma.lock";
+ my $dir = $self->xdir;
+ $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3");
} elsif ($version == 2) {
defined $part or die "partition is required for v2\n";
- # partition is a number or "all"
+ # partition is a number
$self->{partition} = $part;
$self->{lock_path} = undef;
- $self->{msgmap_path} = "$mainrepo/msgmap.sqlite3";
} else {
die "unsupported inbox version=$version\n";
}
$doc->add_value($col, $num);
}
-sub add_values {
- my ($doc, $ts, $ds, $num) = @_;
- add_val($doc, PublicInbox::Search::TS, $ts);
- my $yyyymmdd = strftime('%Y%m%d', gmtime($ds));
- add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
- defined($num) and add_val($doc, PublicInbox::Search::NUM, $num);
-}
-
sub index_users ($$) {
my ($tg, $smsg) = @_;
my ($self, $mime, $bytes, $num, $oid, $mid0) = @_;
my $doc_id;
my $mids = mids($mime->header_obj);
- my $skel = $self->{skeleton};
-
+ $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
+ unless (defined $num) { # v1
+ my $mm = $self->_msgmap_init;
+ $num = $mm->mid_insert($mid0) || $mm->num_for($mid0);
+ }
eval {
my $smsg = PublicInbox::SearchMsg->new($mime);
my $doc = $smsg->{doc};
$xpath = id_compress($xpath);
}
- my $lines = $mime->body_raw =~ tr!\n!\n!;
$smsg->{lines} = $mime->body_raw =~ tr!\n!\n!;
defined $bytes or $bytes = length($mime->as_string);
$smsg->{bytes} = $bytes;
- add_values($doc, $smsg->ts, $smsg->ds, $num);
+ add_val($doc, PublicInbox::Search::TS(), $smsg->ts);
+ my $yyyymmdd = strftime('%Y%m%d', gmtime($smsg->ds));
+ add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
my $tg = $self->term_generator;
# populates smsg->references for smsg->to_doc_data
my $refs = parse_references($smsg);
- $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
my $data = $smsg->to_doc_data($oid, $mid0);
foreach my $mid (@$mids) {
$tg->index_text($mid, 1, 'XM');
}
$self->delete_article($num) if defined $num; # for reindexing
- if ($skel) {
- my @vals = ($smsg->ts, $num, $mids, $xpath, $data);
- $skel->index_skeleton(\@vals);
- $doc->add_boolean_term('Q' . $_) foreach @$mids;
- $doc->add_boolean_term('XNUM' . $num) if defined $num;
- $doc_id = $self->{xdb}->add_document($doc);
- } else {
- $doc_id = link_and_save($self, $doc, $mids, $refs,
- $num, $xpath);
- }
+
+ utf8::encode($data);
+ $data = compress($data);
+ my @vals = ($smsg->ts, $num, $mids, $refs, $xpath, $data);
+ $self->{over}->add_over(\@vals);
+ $doc->add_boolean_term('Q' . $_) foreach @$mids;
+ $doc->add_boolean_term('XNUM' . $num) if defined $num;
+ $doc_id = $self->{xdb}->add_document($doc);
};
if ($@) {
# there is only ONE element in @delete unless we
# have bugs in our v2writable deduplication check
my @delete;
+ my @over_del;
for (; $head != $tail; $head->inc) {
my $docid = $head->get_docid;
my $doc = $db->get_document($docid);
my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
$smsg->load_expand;
- push(@delete, $docid) if $smsg->{blob} eq $oid;
+ if ($smsg->{blob} eq $oid) {
+ push(@delete, $docid);
+ push(@over_del, $smsg->num);
+ }
}
$db->delete_document($_) foreach @delete;
+ $self->{over}->remove_oid($oid, $mid);
scalar(@delete);
}
$self->{term_generator} = $tg;
}
-# increments last_thread_id counter
-# returns a 64-bit integer represented as a decimal string
-sub next_thread_id {
- my ($self) = @_;
- my $db = $self->{xdb};
- my $last_thread_id = int($db->get_metadata('last_thread_id') || 0);
-
- $db->set_metadata('last_thread_id', ++$last_thread_id);
-
- $last_thread_id;
-}
-
sub parse_references ($) {
my ($smsg) = @_;
my $mime = $smsg->{mime};
\@keep;
}
-sub link_doc {
- my ($self, $doc, $refs, $old_tid) = @_;
- my $tid;
-
- if (@$refs) {
- # first ref *should* be the thread root,
- # but we can never trust clients to do the right thing
- my $ref = shift @$refs;
- $tid = resolve_mid_to_tid($self, $ref);
- merge_threads($self, $tid, $old_tid) if defined $old_tid;
-
- # the rest of the refs should point to this tid:
- foreach $ref (@$refs) {
- my $ptid = resolve_mid_to_tid($self, $ref);
- merge_threads($self, $tid, $ptid);
- }
- } else {
- $tid = defined $old_tid ? $old_tid : $self->next_thread_id;
- }
- $doc->add_boolean_term('G' . $tid);
- $tid;
-}
-
-sub link_and_save {
- my ($self, $doc, $mids, $refs, $num, $xpath) = @_;
- my $db = $self->{xdb};
- my $old_tid;
- my $doc_id;
- $doc->add_boolean_term('XNUM' . $num) if defined $num;
- $doc->add_boolean_term('XPATH' . $xpath) if defined $xpath;
- $doc->add_boolean_term('Q' . $_) foreach @$mids;
-
- $self->{skel} and die "Should not have read-only skel here\n";;
- foreach my $mid (@$mids) {
- my $vivified = 0;
- $self->each_smsg_by_mid($mid, sub {
- my ($cur) = @_;
- my $type = $cur->type;
- my $cur_tid = $cur->thread_id;
- $old_tid = $cur_tid unless defined $old_tid;
- if ($type eq 'mail') {
- # do not break existing mail messages,
- # just merge the threads
- merge_threads($self, $old_tid, $cur_tid);
- return 1;
- }
- if ($type ne 'ghost') {
- die "<$mid> has a bad type: $type\n";
- }
- my $tid = link_doc($self, $doc, $refs, $old_tid);
- $old_tid = $tid unless defined $old_tid;
- $doc_id = $cur->{doc_id};
- $self->{xdb}->replace_document($doc_id, $doc);
- ++$vivified;
- 1;
- });
- $vivified > 1 and warn
- "BUG: vivified multiple ($vivified) ghosts for $mid\n";
- }
- # not really important, but we return any vivified ghost docid, here:
- return $doc_id if defined $doc_id;
- link_doc($self, $doc, $refs, $old_tid);
- $self->{xdb}->add_document($doc);
-}
-
sub index_git_blob_id {
my ($doc, $pfx, $objid) = @_;
sub _msgmap_init {
my ($self) = @_;
+ die "BUG: _msgmap_init is only for v1\n" if $self->{version} != 1;
$self->{mm} ||= eval {
require PublicInbox::Msgmap;
- my $msgmap_path = $self->{msgmap_path};
- if (defined $msgmap_path) { # v2
- PublicInbox::Msgmap->new_file($msgmap_path, 1);
- } else {
- PublicInbox::Msgmap->new($self->{mainrepo}, 1);
- }
+ PublicInbox::Msgmap->new($self->{mainrepo}, 1);
};
}
my $reindex = $opts->{reindex};
my ($mkey, $last_commit, $lx, $xlog);
$self->{git}->batch_prepare;
- my $xdb = _xdb_acquire($self);
- $xdb->begin_transaction;
+ my $xdb = $self->begin_txn_lazy;
do {
$xlog = undef;
$mkey = 'last_commit';
$lx = '';
$mkey = undef if $last_commit ne '';
}
+ $self->{over}->rollback_lazy;
+ $self->{over}->disconnect;
+ delete $self->{txn};
$xdb->cancel_transaction;
$xdb = _xdb_release($self);
my $range = $lx eq '' ? $tip : "$lx..$tip";
$xlog = _git_log($self, $range);
- $xdb = _xdb_acquire($self);
- $xdb->begin_transaction;
+ $xdb = $self->begin_txn_lazy;
} while ($xdb->get_metadata('last_commit') ne $last_commit);
my $mm = _msgmap_init($self);
}
if (!$mm_only) {
$xdb->set_metadata($mkey, $commit) if $mkey && $commit;
- $xdb->commit_transaction;
- $xdb = _xdb_release($self);
+ $self->commit_txn_lazy;
}
# let another process do some work... <
if ($more) {
if (!$mm_only) {
- $xdb = _xdb_acquire($self);
- $xdb->begin_transaction;
+ $xdb = $self->begin_txn_lazy;
}
$dbh->begin_work if $dbh;
}
}
}
-# this will create a ghost as necessary
-sub resolve_mid_to_tid {
- my ($self, $mid) = @_;
- my $tid;
- $self->each_smsg_by_mid($mid, sub {
- my ($smsg) = @_;
- my $cur_tid = $smsg->thread_id;
- if (defined $tid) {
- merge_threads($self, $tid, $cur_tid);
- } else {
- $tid = $smsg->thread_id;
- }
- 1;
- });
- return $tid if defined $tid;
-
- $self->create_ghost($mid)->thread_id;
-}
-
-sub create_ghost {
- my ($self, $mid) = @_;
-
- my $tid = $self->next_thread_id;
- my $doc = Search::Xapian::Document->new;
- $doc->add_boolean_term('Q' . $mid);
- $doc->add_boolean_term('G' . $tid);
- $doc->add_boolean_term('T' . 'ghost');
-
- my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
- $self->{xdb}->add_document($doc);
-
- $smsg;
-}
-
-sub merge_threads {
- my ($self, $winner_tid, $loser_tid) = @_;
- return if $winner_tid == $loser_tid;
- my $db = $self->{xdb};
- batch_do($self, 'G' . $loser_tid, sub {
- my ($ids) = @_;
- foreach my $docid (@$ids) {
- my $doc = $db->get_document($docid);
- $doc->remove_term('G' . $loser_tid);
- $doc->add_boolean_term('G' . $winner_tid);
- $db->replace_document($docid, $doc);
- }
- });
-}
-
sub DESTROY {
# order matters for unlocking
$_[0]->{xdb} = undef;
$_[0]->{lockfh} = undef;
}
-# remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton
+# remote_* subs are only used by SearchIdxPart
sub remote_commit {
my ($self) = @_;
if (my $w = $self->{w}) {
print $w "commit\n" or die "failed to write commit: $!";
} else {
$self->commit_txn_lazy;
- if (my $skel = $self->{skeleton}) {
- $skel->commit_txn_lazy;
- }
}
}
sub remote_remove {
my ($self, $oid, $mid) = @_;
if (my $w = $self->{w}) {
- # triggers remove_by_oid in partition or skeleton
+ # triggers remove_by_oid in a partition
print $w "D $oid $mid\n" or die "failed to write remove $!";
} else {
$self->begin_txn_lazy;
my ($self) = @_;
return if $self->{txn};
my $xdb = $self->{xdb} || $self->_xdb_acquire;
+ $self->{over}->begin_lazy;
$xdb->begin_transaction;
$self->{txn} = 1;
+ $xdb;
}
sub commit_txn_lazy {
my ($self) = @_;
delete $self->{txn} or return;
$self->{xdb}->commit_transaction;
+ $self->{over}->commit_lazy;
}
sub worker_done {
use base qw(PublicInbox::SearchIdx);
sub new {
- my ($class, $v2writable, $part, $skel) = @_;
+ my ($class, $v2writable, $part) = @_;
my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part);
- $self->{skeleton} = $skel;
- # create the DB:
+ # create the DB before forking:
$self->_xdb_acquire;
$self->_xdb_release;
+ $self->{over} = $v2writable->{over};
$self->spawn_worker($v2writable, $part) if $v2writable->{parallel};
$self;
}
if ($pid == 0) {
$v2writable->atfork_child;
$v2writable = undef;
- close $w;
+ close $w or die "failed to close: $!";
# F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
# speeds V2Writable batch imports across 8 cores by nearly 20%
}
$self->{pid} = $pid;
$self->{w} = $w;
- close $r;
+ close $r or die "failed to close: $!";
}
sub partition_worker_loop ($$$) {
while (my $line = $r->getline) {
if ($line eq "commit\n") {
$self->commit_txn_lazy;
- $self->{skeleton}->remote_commit;
} elsif ($line eq "close\n") {
$self->_xdb_release;
} elsif ($line eq "barrier\n") {
$self->commit_txn_lazy;
- print { $self->{skeleton}->{w} } "barrier $part\n" or
- die "write failed to skeleton: $!\n";
+ print { $self->{over}->{w} } "barrier $part\n" or
+ die "write failed to overview $!\n";
} elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
my ($oid, $mid) = ($1, $2);
$self->begin_txn_lazy;
$w->flush or die "failed to flush: $!";
} else {
$self->commit_txn_lazy;
- $self->{skeleton}->remote_commit;
}
}
sub new {
my ($class, $mime) = @_;
my $doc = Search::Xapian::Document->new;
- $doc->add_boolean_term('T' . 'mail');
-
- bless { type => 'mail', doc => $doc, mime => $mime }, $class;
+ bless { doc => $doc, mime => $mime }, $class;
}
sub wrap {
);
}
+
sub load_from_data ($$) {
my ($self) = $_[0]; # data = $_[1]
(
sub _extract_mid { mid_clean(mid_mime($_[0]->{mime})) }
-sub thread_id {
- my ($self) = @_;
- my $tid = $self->{thread};
- return $tid if defined $tid;
- $self->{thread} = _get_term_val($self, 'G', qr/\AG/); # *G*roup
-}
+sub tid { $_[0]->{tid} }
# XXX: consider removing this, we can phrase match subject
-sub path {
- my ($self) = @_;
- my $path = $self->{path};
- return $path if defined $path;
- $self->{path} = _get_term_val($self, 'XPATH', qr/\AXPATH/); # path
-}
-
-sub type {
- my ($self) = @_;
- my $type = $self->{type};
- return $type if defined $type;
- $self->{type} = _get_term_val($self, 'T', qr/\AT/);
-}
+sub path { $_[0]->{path} }
1;
use warnings;
use base qw(PublicInbox::Lock);
use PublicInbox::SearchIdxPart;
-use PublicInbox::SearchIdxSkeleton;
use PublicInbox::MIME;
use PublicInbox::Git;
use PublicInbox::Import;
use PublicInbox::MID qw(mids);
use PublicInbox::ContentId qw(content_id content_digest);
use PublicInbox::Inbox;
+use PublicInbox::OverIdxFork;
+use PublicInbox::Msgmap;
+use IO::Handle;
# an estimate of the post-packed size to the raw uncompressed size
my $PACKING_FACTOR = 0.4;
partitions => $nparts,
parallel => 1,
transact_bytes => 0,
+ over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"),
lock_path => "$dir/inbox.lock",
# limit each repo to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
my $mids = mids($mime->header_obj);
if (@$mids) {
my $mid = $mids->[0];
- my $num = $self->{skel}->{mm}->mid_insert($mid);
+ my $num = $self->{mm}->mid_insert($mid);
if (defined $num) { # common case
$$mid0 = $mid;
return $num;
# try the rest of the mids
for(my $i = $#$mids; $i >= 1; $i--) {
my $m = $mids->[$i];
- $num = $self->{skel}->{mm}->mid_insert($m);
+ $num = $self->{mm}->mid_insert($m);
if (defined $num) {
warn "alternative <$m> for <$mid> found\n";
$$mid0 = $m;
my $hdr = $mime->header_obj;
my $dig = content_digest($mime);
$$mid0 = PublicInbox::Import::digest2mid($dig);
- my $num = $self->{skel}->{mm}->mid_insert($$mid0);
+ my $num = $self->{mm}->mid_insert($$mid0);
unless (defined $num) {
# it's hard to spoof the last Received: header
my @recvd = $hdr->header_raw('Received');
$dig->add("Received: $_") foreach (@recvd);
$$mid0 = PublicInbox::Import::digest2mid($dig);
- $num = $self->{skel}->{mm}->mid_insert($$mid0);
+ $num = $self->{mm}->mid_insert($$mid0);
# fall back to a random Message-ID and give up determinism:
until (defined($num)) {
$dig->add(rand);
$$mid0 = PublicInbox::Import::digest2mid($dig);
warn "using random Message-ID <$$mid0> as fallback\n";
- $num = $self->{skel}->{mm}->mid_insert($$mid0);
+ $num = $self->{mm}->mid_insert($$mid0);
}
}
PublicInbox::Import::append_mid($hdr, $$mid0);
# frequently activated.
delete $ibx->{$_} foreach (qw(git mm search));
+ my $over = $self->{over};
$ibx->umask_prepare;
$ibx->with_umask(sub {
$self->lock_acquire;
-
- # first time initialization, first we create the skeleton pipe:
- my $skel = PublicInbox::SearchIdxSkeleton->new($self);
- $self->{skel} = $skel;
+ $over->create($self);
# need to create all parts before initializing msgmap FD
my $max = $self->{partitions} - 1;
# idx_parts must be visible to all forked processes
my $idx = $self->{idx_parts} = [];
for my $i (0..$max) {
- push @$idx,
- PublicInbox::SearchIdxPart->new($self, $i, $skel);
+ push @$idx, PublicInbox::SearchIdxPart->new($self, $i);
}
- # Now that all subprocesses are up, we can open the FD for SQLite:
- $skel->_msgmap_init->{dbh}->begin_work;
+ # Now that all subprocesses are up, we can open the FDs
+ # for SQLite:
+ my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
+ "$self->{-inbox}->{mainrepo}/msgmap.sqlite3", 1);
+ $mm->{dbh}->begin_work;
});
}
my $ibx = $self->{-inbox};
my $srch = $ibx->search;
my $cid = content_id($mime);
- my $skel = $self->{skel};
my $parts = $self->{idx_parts};
- my $mm = $skel->{mm};
+ my $mm = $self->{mm};
my $removed;
my $mids = mids($mime->header_obj);
$orig = undef;
$removed->num; # memoize this for callers
- foreach my $idx (@$parts, $skel) {
+ foreach my $idx (@$parts) {
$idx->remote_remove($oid, $mid);
}
+ $self->{over}->remove_oid($oid, $mid);
}
1; # continue
});
if (my $im = $self->{im}) {
$im->barrier;
}
- my $skel = $self->{skel};
my $parts = $self->{idx_parts};
- if ($parts && $skel) {
- my $dbh = $skel->{mm}->{dbh};
- $dbh->commit; # SQLite data is second in importance
+ if ($parts) {
+ my $dbh = $self->{mm}->{dbh};
+ $dbh->commit; # SQLite msgmap data is second in importance
+
+ my $over = $self->{over};
- # Now deal with Xapian
- $skel->barrier_init(scalar(@$parts));
- # each partition needs to issue a barrier command to skel:
+ # Now deal with Xapian and overview DB
+ $over->barrier_init(scalar(@$parts));
+
+ # each partition needs to issue a barrier command to over
$_->remote_barrier foreach @$parts;
- $skel->barrier_wait; # wait for each Xapian partition
+ $over->barrier_wait; # wait for each Xapian partition
$dbh->begin_work;
}
sub searchidx_checkpoint {
my ($self, $more) = @_;
- # order matters, we can only close {skel} after all partitions
- # are done because the partitions also write to {skel}
+ # order matters, we can only close {over} after all partitions
+ # are done because the partitions also write to {over}
if (my $parts = $self->{idx_parts}) {
foreach my $idx (@$parts) {
- $idx->remote_commit; # propagates commit to skel
+ $idx->remote_commit; # propagates commit to over
$idx->remote_close unless $more;
}
delete $self->{idx_parts} unless $more;
}
- if (my $skel = $self->{skel}) {
- my $dbh = $skel->{mm}->{dbh};
+ if (my $mm = $self->{mm}) {
+ my $dbh = $mm->{dbh};
$dbh->commit;
if ($more) {
$dbh->begin_work;
} else {
- $skel->remote_close;
- delete $self->{skel};
+ delete $self->{mm};
}
}
+ my $over = $self->{over};
+ $over->remote_commit;
+ if (!$more) {
+ $over->remote_close;
+ }
$self->{transact_bytes} = 0;
}
if (my $im = $self->{im}) {
$im->atfork_child;
}
+ die "unexpected mm" if $self->{mm};
}
sub mark_deleted {
if (!defined($mid0) && $regen && !$del) {
$num = $$regen--;
die "BUG: ran out of article numbers\n" if $num <= 0;
- my $mm = $self->{skel}->{mm};
+ my $mm = $self->{mm};
foreach my $mid (reverse @$mids) {
if ($mm->mid_set($num, $mid) == 1) {
$mid0 = $mid;
my $head = $ibx->{ref_head} || 'refs/heads/master';
$self->idx_init; # acquire lock
my $x40 = qr/[a-f0-9]{40}/;
- my $mm_tmp = $self->{skel}->{mm}->tmp_clone;
+ my $mm_tmp = $self->{mm}->tmp_clone;
if (!$regen) {
my (undef, $max) = $mm_tmp->minmax;
unless (defined $max) {
use Cwd 'abs_path';
use File::Temp qw(tempdir);
use File::Path qw(remove_tree);
-use PublicInbox::Spawn qw(spawn);
my $usage = "Usage: public-inbox-compact REPO_DIR\n";
my $dir = shift or die $usage;
my $config = PublicInbox::Config->new;
sub commit_changes ($$$) {
my ($im, $old, $new) = @_;
my @st = stat($old) or die "failed to stat($old): $!\n";
+ link("$old/over.sqlite3", "$new/over.sqlite3") or die
+ "failed to link {$old => $new}/over.sqlite3: $!\n";
rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
rename($new, $old) or die "rename $new => $old: $!\n";
$ibx->with_umask(sub {
$v2w->lock_acquire;
my @parts;
- my $skel;
while (defined(my $dn = readdir($dh))) {
if ($dn =~ /\A\d+\z/) {
push @parts, "$old/$dn";
- } elsif ($dn eq 'skel') {
- $skel = "$old/$dn";
} elsif ($dn eq '.' || $dn eq '..') {
} else {
warn "W: skipping unknown Xapian DB: $old/$dn\n"
}
}
close $dh;
- my %pids;
-
- if (@parts) {
- my $pid = spawn(['xapian-compact', @parts, "$new/0" ]);
- defined $pid or die "compact failed: $?\n";
- $pids{$pid} = 'xapian-compact (parts)';
- } else {
- warn "No parts found in $old\n";
- }
- if (defined $skel) {
- my $pid = spawn(['xapian-compact', $skel, "$new/skel"]);
- defined $pid or die "compact failed: $?\n";
- $pids{$pid} = 'xapian-compact (skel)';
- } else {
- warn "$old/skel missing\n";
- }
- scalar keys %pids or
- die "No xapian-compact processes running\n";
- while (scalar keys %pids) {
- my $pid = waitpid(-1, 0);
- my $desc = delete $pids{$pid};
- die "$desc failed: $?\n" if $?;
- }
+ die "No Xapian parts found in $old\n" unless @parts;
+ my $cmd = ['xapian-compact', @parts, "$new/0" ];
+ PublicInbox::Import::run_die($cmd);
commit_changes($v2w, $old, $new);
});
} elsif ($v == 1) {
--- /dev/null
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw/tempdir/;
+foreach my $mod (qw(DBD::SQLite)) {
+ eval "require $mod";
+ plan skip_all => "$mod missing for over.t" if $@;
+}
+
+use_ok 'PublicInbox::OverIdx';
+my $tmpdir = tempdir('pi-over-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $over = PublicInbox::OverIdx->new("$tmpdir/over.sqlite3");
+$over->connect;
+my $x = $over->next_tid;
+is(int($x), $x, 'integer tid');
+my $y = $over->next_tid;
+is($y, $x+1, 'tid increases');
+
+$x = $over->sid('hello-world');
+is(int($x), $x, 'integer sid');
+$y = $over->sid('hello-WORLD');
+is($y, $x+1, 'sid ncreases');
+is($over->sid('hello-world'), $x, 'idempotent');
+$over->disconnect;
+
+$over = PublicInbox::OverIdx->new("$tmpdir/over.sqlite3");
+$over->connect;
+is($over->sid('hello-world'), $x, 'idempotent across reopen');
+$over->each_by_mid('never', sub { fail('should not be called') });
+
+$x = $over->create_ghost('never');
+is(int($x), $x, 'integer tid for ghost');
+$y = $over->create_ghost('NEVAR');
+is($y, $x + 1, 'integer tid for ghost increases');
+
+done_testing();
my $num = 0;
# nb. using internal API, fragile!
-my $xdb = $rw->_xdb_acquire;
-$xdb->begin_transaction;
+$rw->begin_txn_lazy;
foreach (reverse split(/\n\n/, $data)) {
$_ .= "\n";
ok($doc_id, 'message added: '. $mid);
}
-$xdb->commit_transaction;
-$rw = undef;
+$rw->commit_txn_lazy;
my $cfgpfx = "publicinbox.test";
my $config = PublicInbox::Config->new({
my $num = 0;
# nb. using internal API, fragile!
-my $xdb = $rw->_xdb_acquire;
-$xdb->begin_transaction;
+my $xdb = $rw->begin_txn_lazy;
my @mids;
foreach (reverse split(/\n\n/, $data)) {
my $prev;
foreach my $mid (@mids) {
- my $res = $rw->get_thread($mid);
+ my $res = $rw->{over}->get_thread($mid);
is(3, $res->{total}, "got all messages from $mid");
}
+$rw->commit_txn_lazy;
+
done_testing();
1;
$rw = undef;
my $ro = PublicInbox::Search->new($git_dir);
my $rw_commit = sub {
- $rw->{xdb}->commit_transaction if $rw && $rw->{xdb};
+ $rw->commit_txn_lazy if $rw;
$rw = PublicInbox::SearchIdx->new($git_dir, 1);
- $rw->_xdb_acquire->begin_transaction;
+ $rw->begin_txn_lazy;
};
{
ok($found, "message found");
is($root_id, $found->{doc_id}, 'doc_id set correctly');
is($found->mid, 'root@s', 'mid set correctly');
- ok(int($found->thread_id) > 0, 'thread_id is an integer');
my ($res, @res);
my @exp = sort qw(root@s last@s);
my $ghost_id = $rw->add_message($was_ghost);
is($ghost_id, int($ghost_id), "ghost_id is an integer: $ghost_id");
- ok($ghost_id < $reply_id, "ghost vivified from earlier message");
+ my $msgs = $rw->{over}->get_thread('ghost-message@s')->{msgs};
+ is(scalar(@$msgs), 2, 'got both messages in ghost thread');
+ foreach (qw(sid tid)) {
+ is($msgs->[0]->{$_}, $msgs->[1]->{$_}, "{$_} match");
+ }
+ isnt($msgs->[0]->{num}, $msgs->[1]->{num}, "num do not match");
+ ok($_->{num} > 0, 'positive art num') foreach @$msgs
}
# search thread on ghost
is($txt->{msgs}->[0]->mid, $res->{msgs}->[0]->mid,
'search inside text attachments works');
}
+$rw->commit_txn_lazy;
done_testing();