]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/OverIdx.pm
replace Xapian skeleton with SQLite overview DB
[public-inbox.git] / lib / PublicInbox / OverIdx.pm
1 # Copyright (C) 2018 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
5 # Unlike Msgmap, this is an _UNSTABLE_ database which can be
6 # tweaked/updated over time and rebuilt.
7 package PublicInbox::OverIdx;
8 use strict;
9 use warnings;
10 use base qw(PublicInbox::Over);
11 use IO::Handle;
12 use DBI qw(:sql_types); # SQL_BLOB
13
14 sub dbh_new {
15         my ($self) = @_;
16         my $dbh = $self->SUPER::dbh_new;
17         $dbh->do('PRAGMA synchronous = OFF'); # commit_fsync instead
18         $dbh->do('PRAGMA journal_mode = TRUNCATE');
19         $dbh->do('PRAGMA cache_size = 80000');
20         create_tables($dbh);
21         $dbh;
22 }
23
24 sub commit_fsync {
25         my $fn = $_[0]->{filename};
26         if (open my $fh, '+<', $fn) {
27                 $fh->sync;
28                 close $fh;
29         }
30 }
31
32 sub get_counter ($$) {
33         my ($dbh, $key) = @_;
34         my $sth = $dbh->prepare_cached(<<'', undef, 1);
35 SELECT val FROM counter WHERE key = ? LIMIT 1
36
37         $sth->execute($key);
38         $sth->fetchrow_array;
39 }
40
41 sub adj_counter ($$$) {
42         my ($self, $key, $op) = @_;
43         my $dbh = $self->{dbh};
44         my $sth = $dbh->prepare_cached(<<"");
45 UPDATE counter SET val = val $op 1 WHERE key = ?
46
47         $sth->execute($key);
48
49         get_counter($dbh, $key);
50 }
51
52 sub next_tid { adj_counter($_[0], 'thread', '+') }
53 sub next_ghost_num { adj_counter($_[0], 'ghost', '-') }
54
55 sub id_for ($$$$$) {
56         my ($self, $tbl, $id_col, $val_col, $val) = @_;
57         my $dbh = $self->{dbh};
58         my $in = $dbh->prepare_cached(<<"")->execute($val);
59 INSERT OR IGNORE INTO $tbl ($val_col) VALUES (?)
60
61         if ($in == 0) {
62                 my $sth = $dbh->prepare_cached(<<"", undef, 1);
63 SELECT $id_col FROM $tbl WHERE $val_col = ? LIMIT 1
64
65                 $sth->execute($val);
66                 $sth->fetchrow_array;
67         } else {
68                 $dbh->last_insert_id(undef, undef, $tbl, $id_col);
69         }
70 }
71
72 sub sid {
73         my ($self, $path) = @_;
74         return unless defined $path && $path ne '';
75         id_for($self, 'subject', 'sid', 'path' => $path);
76 }
77
78 sub mid2id {
79         my ($self, $mid) = @_;
80         id_for($self, 'msgid', 'id', 'mid' => $mid);
81 }
82
83 sub delete_by_num {
84         my ($self, $num) = @_;
85         my $dbh = $self->{dbh};
86         foreach (qw(over id2num)) {
87                 $dbh->prepare_cached(<<"")->execute($num);
88 DELETE FROM $_ WHERE num = ?
89
90         }
91 }
92
93 # this includes ghosts
94 sub each_by_mid {
95         my ($self, $mid, $cols, $cb) = @_;
96         my $dbh = $self->{dbh};
97
98 =over
99         I originally wanted to stuff everything into a single query:
100
101         SELECT over.* FROM over
102         LEFT JOIN id2num ON over.num = id2num.num
103         LEFT JOIN msgid ON msgid.id = id2num.id
104         WHERE msgid.mid = ? AND over.num >= ?
105         ORDER BY over.num ASC
106         LIMIT 1000
107
108         But it's faster broken out (and we're always in a
109         transaction for subroutines in this file)
110 =cut
111
112         my $sth = $dbh->prepare_cached(<<'', undef, 1);
113 SELECT id FROM msgid WHERE mid = ? LIMIT 1
114
115         $sth->execute($mid);
116         my $id = $sth->fetchrow_array;
117         defined $id or return;
118
119         push(@$cols, 'num');
120         $cols = join(',', map { $_ } @$cols);
121         my $lim = 10;
122         my $prev = get_counter($dbh, 'ghost');
123         while (1) {
124                 $sth = $dbh->prepare_cached(<<"", undef, 1);
125 SELECT num FROM id2num WHERE id = ? AND num >= ?
126 ORDER BY num ASC
127 LIMIT $lim
128
129                 $sth->execute($id, $prev);
130                 my $nums = $sth->fetchall_arrayref;
131                 my $nr = scalar(@$nums) or return;
132                 $prev = $nums->[-1]->[0];
133
134                 $sth = $dbh->prepare_cached(<<"", undef, 1);
135 SELECT $cols FROM over WHERE over.num = ? LIMIT 1
136
137                 foreach (@$nums) {
138                         $sth->execute($_->[0]);
139                         my $smsg = $sth->fetchrow_hashref;
140                         $cb->(PublicInbox::Over::load_from_row($smsg)) or
141                                 return;
142                 }
143                 return if $nr != $lim;
144         }
145 }
146
147 # this will create a ghost as necessary
148 sub resolve_mid_to_tid {
149         my ($self, $mid) = @_;
150         my $tid;
151         each_by_mid($self, $mid, ['tid'], sub {
152                 my ($smsg) = @_;
153                 my $cur_tid = $smsg->{tid};
154                 if (defined $tid) {
155                         merge_threads($self, $tid, $cur_tid);
156                 } else {
157                         $tid = $cur_tid;
158                 }
159                 1;
160         });
161         defined $tid ? $tid : create_ghost($self, $mid);
162 }
163
164 sub create_ghost {
165         my ($self, $mid) = @_;
166         my $id = $self->mid2id($mid);
167         my $num = $self->next_ghost_num;
168         $num < 0 or die "ghost num is non-negative: $num\n";
169         my $tid = $self->next_tid;
170         my $dbh = $self->{dbh};
171         $dbh->prepare_cached(<<'')->execute($num, $tid);
172 INSERT INTO over (num, tid) VALUES (?,?)
173
174         $dbh->prepare_cached(<<'')->execute($id, $num);
175 INSERT INTO id2num (id, num) VALUES (?,?)
176
177         $tid;
178 }
179
180 sub merge_threads {
181         my ($self, $winner_tid, $loser_tid) = @_;
182         return if $winner_tid == $loser_tid;
183         my $dbh = $self->{dbh};
184         $dbh->prepare_cached(<<'')->execute($winner_tid, $loser_tid);
185 UPDATE over SET tid = ? WHERE tid = ?
186
187 }
188
189 sub link_refs {
190         my ($self, $refs, $old_tid) = @_;
191         my $tid;
192
193         if (@$refs) {
194                 # first ref *should* be the thread root,
195                 # but we can never trust clients to do the right thing
196                 my $ref = $refs->[0];
197                 $tid = resolve_mid_to_tid($self, $ref);
198                 merge_threads($self, $tid, $old_tid) if defined $old_tid;
199
200                 # the rest of the refs should point to this tid:
201                 foreach my $i (1..$#$refs) {
202                         $ref = $refs->[$i];
203                         my $ptid = resolve_mid_to_tid($self, $ref);
204                         merge_threads($self, $tid, $ptid);
205                 }
206         } else {
207                 $tid = defined $old_tid ? $old_tid : $self->next_tid;
208         }
209         $tid;
210 }
211
212 sub add_over {
213         my ($self, $values) = @_;
214         my ($ts, $num, $mids, $refs, $xpath, $ddd) = @$values;
215         my $old_tid;
216         my $vivified = 0;
217
218         $self->begin_lazy;
219         $self->delete_by_num($num);
220         foreach my $mid (@$mids) {
221                 my $v = 0;
222                 each_by_mid($self, $mid, ['tid'], sub {
223                         my ($cur) = @_;
224                         my $cur_tid = $cur->{tid};
225                         my $n = $cur->{num};
226                         die "num must not be zero for $mid" if !$n;
227                         $old_tid = $cur_tid unless defined $old_tid;
228                         if ($n > 0) { # regular mail
229                                 merge_threads($self, $old_tid, $cur_tid);
230                         } elsif ($n < 0) { # ghost
231                                 link_refs($self, $refs, $old_tid);
232                                 $self->delete_by_num($n);
233                                 $v++;
234                         }
235                         1;
236                 });
237                 $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n";
238                 $vivified += $v;
239         }
240         my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid);
241         my $sid = $self->sid($xpath);
242         my $dbh = $self->{dbh};
243         my $sth = $dbh->prepare_cached(<<'');
244 INSERT INTO over (num, tid, sid, ts, ddd)
245 VALUES (?,?,?,?,?)
246
247         my $n = 0;
248         my @v = ($num, $tid, $sid, $ts);
249         foreach (@v) { $sth->bind_param(++$n, $_) }
250         $sth->bind_param(++$n, $ddd, SQL_BLOB);
251         $sth->execute;
252         $sth = $dbh->prepare_cached(<<'');
253 INSERT INTO id2num (id, num) VALUES (?,?)
254
255         foreach my $mid (@$mids) {
256                 my $id = $self->mid2id($mid);
257                 $sth->execute($id, $num);
258         }
259 }
260
261 sub delete_articles {
262         my ($self, $nums) = @_;
263         my $dbh = $self->connect;
264         $self->delete_by_num($_) foreach @$nums;
265 }
266
267 sub remove_oid {
268         my ($self, $oid, $mid) = @_;
269         $self->begin_lazy;
270         each_by_mid($self, $mid, ['ddd'], sub {
271                 my ($smsg) = @_;
272                 $self->delete_by_num($smsg->{num}) if $smsg->{blob} eq $oid;
273                 1;
274         });
275 }
276
277 sub create_tables {
278         my ($dbh) = @_;
279
280         $dbh->do(<<'');
281 CREATE TABLE IF NOT EXISTS over (
282         num INTEGER NOT NULL,
283         tid INTEGER NOT NULL,
284         sid INTEGER,
285         ts INTEGER,
286         ddd VARBINARY, /* doc-data-deflated */
287         UNIQUE (num)
288 )
289
290         $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)');
291         $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)');
292         $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)');
293
294         $dbh->do(<<'');
295 CREATE TABLE IF NOT EXISTS counter (
296         key VARCHAR(8) PRIMARY KEY NOT NULL,
297         val INTEGER DEFAULT 0,
298         UNIQUE (key)
299 )
300
301         $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('thread')");
302         $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('ghost')");
303
304         $dbh->do(<<'');
305 CREATE TABLE IF NOT EXISTS subject (
306         sid INTEGER PRIMARY KEY AUTOINCREMENT,
307         path VARCHAR(40) NOT NULL,
308         UNIQUE (path)
309 )
310
311         $dbh->do(<<'');
312 CREATE TABLE IF NOT EXISTS id2num (
313         id INTEGER NOT NULL,
314         num INTEGER NOT NULL,
315         UNIQUE (id, num)
316 )
317
318         # performance critical:
319         $dbh->do('CREATE INDEX IF NOT EXISTS idx_inum ON id2num (num)');
320         $dbh->do('CREATE INDEX IF NOT EXISTS idx_id ON id2num (id)');
321
322         $dbh->do(<<'');
323 CREATE TABLE IF NOT EXISTS msgid (
324         id INTEGER PRIMARY KEY AUTOINCREMENT,
325         mid VARCHAR(244) NOT NULL,
326         UNIQUE (mid)
327 )
328
329 }
330
331 sub commit_lazy {
332         my ($self) = @_;
333         delete $self->{txn} or return;
334         $self->{dbh}->commit;
335 }
336
337 sub begin_lazy {
338         my ($self) = @_;
339         return if $self->{txn};
340         my $dbh = $self->connect or return;
341         $dbh->begin_work;
342         # $dbh->{Profile} = 2;
343         $self->{txn} = 1;
344 }
345
346 sub rollback_lazy {
347         my ($self) = @_;
348         delete $self->{txn} or return;
349         $self->{dbh}->rollback;
350 }
351
352 sub disconnect {
353         my ($self) = @_;
354         die "in transaction" if $self->{txn};
355         $self->{dbh} = undef;
356 }
357
358 sub create {
359         my ($self) = @_;
360         unless (-r $self->{filename}) {
361                 require File::Path;
362                 require File::Basename;
363                 File::Path::mkpath(File::Basename::dirname($self->{filename}));
364         }
365         # create the DB:
366         PublicInbox::Over::connect($self);
367         $self->disconnect;
368 }
369
370 1;