# Copyright (C) 2018-2021 all contributors
# License: AGPL-3.0+
# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
# Unlike Msgmap, this is an _UNSTABLE_ database which can be
# tweaked/updated over time and rebuilt.
package PublicInbox::Over;
use strict;
use v5.10.1;
use DBI qw(:sql_types); # SQL_BLOB
use DBD::SQLite;
use PublicInbox::Smsg;
use Compress::Zlib qw(uncompress);
use constant DEFAULT_LIMIT => 1000;
sub dbh_new {
my ($self, $rw) = @_;
my $f = delete $self->{filename};
if (!-f $f) { # SQLite defaults mode to 0644, we want 0666
if ($rw) {
require PublicInbox::Spawn;
open my $fh, '+>>', $f or die "failed to open $f: $!";
PublicInbox::Spawn::nodatacow_fd(fileno($fh));
} else {
$self->{filename} = $f; # die on stat() below:
}
}
my (@st, $st, $dbh);
my $tries = 0;
do {
@st = stat($f) or die "failed to stat $f: $!";
$st = pack('dd', $st[0], $st[1]); # 0: dev, 1: inode
$dbh = DBI->connect("dbi:SQLite:dbname=$f",'','', {
AutoCommit => 1,
RaiseError => 1,
PrintError => 0,
ReadOnly => !$rw,
sqlite_use_immediate_transaction => 1,
});
$self->{st} = $st;
@st = stat($f) or die "failed to stat $f: $!";
$st = pack('dd', $st[0], $st[1]);
} while ($st ne $self->{st} && $tries++ < 3);
warn "W: $f: .st_dev, .st_ino unstable\n" if $st ne $self->{st};
if ($rw) {
# TRUNCATE reduces I/O compared to the default (DELETE).
#
# Do not use WAL by default since we expect the case
# where any users may read via read-only daemons
# (-httpd/-imapd/-nntpd); but only a single user has
# write permissions for -watch/-mda.
#
# Read-only WAL support in SQLite 3.22.0 (2018-01-22)
# doesn't do what we need: it is only intended for
# immutable read-only media (e.g. CD-ROM) and not
# usable for our use case described above.
#
# If an admin is willing to give read-only daemons R/W
# permissions; they can enable WAL manually and we will
# respect that by not clobbering it.
my $jm = $dbh->selectrow_array('PRAGMA journal_mode');
$dbh->do('PRAGMA journal_mode = TRUNCATE') if $jm ne 'wal';
$dbh->do('PRAGMA synchronous = OFF') if $rw > 1;
}
$dbh;
}
sub new {
my ($class, $f) = @_;
bless { filename => $f }, $class;
}
sub dbh_close {
my ($self) = @_;
if (my $dbh = delete $self->{dbh}) {
delete $self->{-get_art};
$self->{filename} = $dbh->sqlite_db_filename;
}
}
sub dbh ($) { $_[0]->{dbh} //= $_[0]->dbh_new } # dbh_new may be subclassed
sub load_from_row ($;$) {
my ($smsg, $cull) = @_;
bless $smsg, 'PublicInbox::Smsg';
if (defined(my $data = delete $smsg->{ddd})) {
$data = uncompress($data);
PublicInbox::Smsg::load_from_data($smsg, $data);
# saves over 600K for 1000+ message threads
PublicInbox::Smsg::psgi_cull($smsg) if $cull;
}
$smsg
}
sub do_get {
my ($self, $sql, $opts, @args) = @_;
my $lim = (($opts->{limit} || 0) + 0) || DEFAULT_LIMIT;
$sql .= "LIMIT $lim";
my $msgs = dbh($self)->selectall_arrayref($sql, { Slice => {} }, @args);
my $cull = $opts->{cull};
load_from_row($_, $cull) for @$msgs;
$msgs
}
sub query_xover {
my ($self, $beg, $end) = @_;
do_get($self, <<'', {}, $beg, $end);
SELECT num,ts,ds,ddd FROM over WHERE num >= ? AND num <= ?
ORDER BY num ASC
}
sub query_ts {
my ($self, $ts, $prev) = @_;
do_get($self, <<'', {}, $ts, $prev);
SELECT num,ddd FROM over WHERE ts >= ? AND num > ?
ORDER BY num ASC
}
sub get_all {
my $self = shift;
my $nr = scalar(@_) or return [];
my $in = '?' . (',?' x ($nr - 1));
do_get($self, <<"", { cull => 1, limit => $nr }, @_);
SELECT num,ts,ds,ddd FROM over WHERE num IN ($in)
}
sub nothing () { wantarray ? (0, []) : [] };
sub get_thread {
my ($self, $mid, $prev) = @_;
my $dbh = dbh($self);
my $opts = { cull => 1 };
my $id = $dbh->selectrow_array(<<'', undef, $mid);
SELECT id FROM msgid WHERE mid = ? LIMIT 1
defined $id or return nothing;
my $num = $dbh->selectrow_array(<<'', undef, $id);
SELECT num FROM id2num WHERE id = ? AND num > 0
ORDER BY num ASC LIMIT 1
defined $num or return nothing;
my ($tid, $sid) = $dbh->selectrow_array(<<'', undef, $num);
SELECT tid,sid FROM over WHERE num = ? LIMIT 1
defined $tid or return nothing; # $sid may be undef
my $cond_all = '(tid = ? OR sid = ?) AND num > ?';
my $sort_col = 'ds';
$num = 0;
if ($prev) { # mboxrd stream, only
$num = $prev->{num} || 0;
$sort_col = 'num';
}
my $cols = 'num,ts,ds,ddd';
unless (wantarray) {
return do_get($self, <<"", $opts, $tid, $sid, $num);
SELECT $cols FROM over WHERE $cond_all
ORDER BY $sort_col ASC
}
# HTML view always wants an array and never uses $prev,
# but the mbox stream never wants an array and always has $prev
die '$prev not supported with wantarray' if $prev;
my $nr = $dbh->selectrow_array(<<"", undef, $tid, $sid, $num);
SELECT COUNT(num) FROM over WHERE $cond_all
# giant thread, prioritize strict (tid) matches and throw
# in the loose (sid) matches at the end
my $msgs = do_get($self, <<"", $opts, $tid, $num);
SELECT $cols FROM over WHERE tid = ? AND num > ?
ORDER BY $sort_col ASC
# do we have room for loose matches? get the most recent ones, first:
my $lim = DEFAULT_LIMIT - scalar(@$msgs);
if ($lim > 0) {
$opts->{limit} = $lim;
my $loose = do_get($self, <<"", $opts, $tid, $sid, $num);
SELECT $cols FROM over WHERE tid != ? AND sid = ? AND num > ?
ORDER BY $sort_col DESC
# TODO separate strict and loose matches here once --reindex
# is fixed to preserve `tid' properly
push @$msgs, @$loose;
}
($nr, $msgs);
}
# strict `tid' matches, only, for thread-expanded mbox.gz search results
# and future CLI interface
# returns true if we have IDs, undef if not
sub expand_thread {
my ($self, $ctx) = @_;
my $dbh = dbh($self);
do {
defined(my $num = $ctx->{ids}->[0]) or return;
my ($tid) = $dbh->selectrow_array(<<'', undef, $num);
SELECT tid FROM over WHERE num = ?
if (defined($tid)) {
my $sql = <<'';
SELECT num FROM over WHERE tid = ? AND num > ?
ORDER BY num ASC LIMIT 1000
my $xids = $dbh->selectcol_arrayref($sql, undef, $tid,
$ctx->{prev} // 0);
if (scalar(@$xids)) {
$ctx->{prev} = $xids->[-1];
$ctx->{xids} = $xids;
return 1; # success
}
}
$ctx->{prev} = 0;
shift @{$ctx->{ids}};
} while (1);
}
sub recent {
my ($self, $opts, $after, $before) = @_;
my ($s, @v);
if (defined($before)) {
if (defined($after)) {
$s = '+num > 0 AND ts >= ? AND ts <= ? ORDER BY ts DESC';
@v = ($after, $before);
} else {
$s = '+num > 0 AND ts <= ? ORDER BY ts DESC';
@v = ($before);
}
} else {
if (defined($after)) {
$s = '+num > 0 AND ts >= ? ORDER BY ts ASC';
@v = ($after);
} else {
$s = '+num > 0 ORDER BY ts DESC';
}
}
do_get($self, <<"", $opts, @v);
SELECT ts,ds,ddd FROM over WHERE $s
}
sub get_art {
my ($self, $num) = @_;
# caching $sth ourselves is faster than prepare_cached
my $sth = $self->{-get_art} //= dbh($self)->prepare(<<'');
SELECT num,tid,ds,ts,ddd FROM over WHERE num = ? LIMIT 1
$sth->execute($num);
my $smsg = $sth->fetchrow_hashref;
$smsg ? load_from_row($smsg) : undef;
}
sub get_xref3 {
my ($self, $num, $raw) = @_;
my $dbh = dbh($self);
my $sth = $dbh->prepare_cached(<<'', undef, 1);
SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id,xnum ASC
$sth->execute($num);
my $rows = $sth->fetchall_arrayref;
return $rows if $raw;
my $eidx_key_sth = $dbh->prepare_cached(<<'', undef, 1);
SELECT eidx_key FROM inboxes WHERE ibx_id = ?
[ map {
my $r = $_;
$eidx_key_sth->execute($r->[0]);
my $eidx_key = $eidx_key_sth->fetchrow_array;
$eidx_key //= "missing://ibx_id=$r->[0]";
"$eidx_key:$r->[1]:".unpack('H*', $r->[2]);
} @$rows ];
}
sub next_by_mid {
my ($self, $mid, $id, $prev) = @_;
my $dbh = dbh($self);
unless (defined $$id) {
my $sth = $dbh->prepare_cached(<<'', undef, 1);
SELECT id FROM msgid WHERE mid = ? LIMIT 1
$sth->execute($mid);
$$id = $sth->fetchrow_array;
defined $$id or return;
}
my $sth = $dbh->prepare_cached(<<"", undef, 1);
SELECT num FROM id2num WHERE id = ? AND num > ?
ORDER BY num ASC LIMIT 1
$$prev ||= 0;
$sth->execute($$id, $$prev);
my $num = $sth->fetchrow_array or return;
$$prev = $num;
get_art($self, $num);
}
# IMAP search, this is limited by callers to UID_SLICE size (50K)
sub uid_range {
my ($self, $beg, $end, $sql) = @_;
my $dbh = dbh($self);
my $q = 'SELECT num FROM over WHERE num >= ? AND num <= ?';
# This is read-only, anyways; but caller should verify it's
# only sending \A[0-9]+\z for ds and ts column ranges
$q .= $$sql if $sql;
$q .= ' ORDER BY num ASC';
$dbh->selectcol_arrayref($q, undef, $beg, $end);
}
sub max {
my ($self) = @_;
my $sth = dbh($self)->prepare_cached(<<'', undef, 1);
SELECT MAX(num) FROM over WHERE num > 0
$sth->execute;
$sth->fetchrow_array // 0;
}
sub imap_exists {
my ($self, $uid_base, $uid_end) = @_;
my $sth = dbh($self)->prepare_cached(<<'', undef, 1);
SELECT COUNT(num) FROM over WHERE num > ? AND num <= ?
$sth->execute($uid_base, $uid_end);
$sth->fetchrow_array;
}
sub check_inodes {
my ($self) = @_;
my $dbh = $self->{dbh} or return;
my $f = $dbh->sqlite_db_filename;
if (my @st = stat($f)) { # did st_dev, st_ino change?
my $st = pack('dd', $st[0], $st[1]);
# don't actually reopen, just let {dbh} be recreated later
dbh_close($self) if $st ne ($self->{st} // $st);
} else {
warn "W: stat $f: $!\n";
}
}
sub oidbin_exists {
my ($self, $oidbin) = @_;
if (wantarray) {
my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC
$sth->bind_param(1, $oidbin, SQL_BLOB);
$sth->execute;
my $tmp = $sth->fetchall_arrayref;
map { $_->[0] } @$tmp;
} else {
my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
$sth->bind_param(1, $oidbin, SQL_BLOB);
$sth->execute;
$sth->fetchrow_array;
}
}
sub blob_exists { oidbin_exists($_[0], pack('H*', $_[1])) }
# used by NNTP.pm
sub ids_after {
my ($self, $num) = @_;
my $ids = dbh($self)->selectcol_arrayref(<<'', undef, $$num);
SELECT num FROM over WHERE num > ?
ORDER BY num ASC LIMIT 1000
$$num = $ids->[-1] if @$ids;
$ids;
}
1;