]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/OverIdx.pm
5c20f1f0c2170c4dae4a3417a8c77ad73ee95010
[public-inbox.git] / lib / PublicInbox / OverIdx.pm
1 # Copyright (C) 2018 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
5 # Unlike Msgmap, this is an _UNSTABLE_ database which can be
6 # tweaked/updated over time and rebuilt.
7 package PublicInbox::OverIdx;
8 use strict;
9 use warnings;
10 use base qw(PublicInbox::Over);
11 use IO::Handle;
12 use DBI qw(:sql_types); # SQL_BLOB
13
14 sub dbh_new {
15         my ($self) = @_;
16         my $dbh = $self->SUPER::dbh_new;
17         $dbh->do('PRAGMA journal_mode = TRUNCATE');
18         $dbh->do('PRAGMA cache_size = 80000');
19         create_tables($dbh);
20         $dbh;
21 }
22
23 sub get_counter ($$) {
24         my ($dbh, $key) = @_;
25         my $sth = $dbh->prepare_cached(<<'', undef, 1);
26 SELECT val FROM counter WHERE key = ? LIMIT 1
27
28         $sth->execute($key);
29         $sth->fetchrow_array;
30 }
31
32 sub adj_counter ($$$) {
33         my ($self, $key, $op) = @_;
34         my $dbh = $self->{dbh};
35         my $sth = $dbh->prepare_cached(<<"");
36 UPDATE counter SET val = val $op 1 WHERE key = ?
37
38         $sth->execute($key);
39
40         get_counter($dbh, $key);
41 }
42
43 sub next_tid { adj_counter($_[0], 'thread', '+') }
44 sub next_ghost_num { adj_counter($_[0], 'ghost', '-') }
45
46 sub id_for ($$$$$) {
47         my ($self, $tbl, $id_col, $val_col, $val) = @_;
48         my $dbh = $self->{dbh};
49         my $in = $dbh->prepare_cached(<<"")->execute($val);
50 INSERT OR IGNORE INTO $tbl ($val_col) VALUES (?)
51
52         if ($in == 0) {
53                 my $sth = $dbh->prepare_cached(<<"", undef, 1);
54 SELECT $id_col FROM $tbl WHERE $val_col = ? LIMIT 1
55
56                 $sth->execute($val);
57                 $sth->fetchrow_array;
58         } else {
59                 $dbh->last_insert_id(undef, undef, $tbl, $id_col);
60         }
61 }
62
63 sub sid {
64         my ($self, $path) = @_;
65         return unless defined $path && $path ne '';
66         id_for($self, 'subject', 'sid', 'path' => $path);
67 }
68
69 sub mid2id {
70         my ($self, $mid) = @_;
71         id_for($self, 'msgid', 'id', 'mid' => $mid);
72 }
73
74 sub delete_by_num {
75         my ($self, $num) = @_;
76         my $dbh = $self->{dbh};
77         foreach (qw(over id2num)) {
78                 $dbh->prepare_cached(<<"")->execute($num);
79 DELETE FROM $_ WHERE num = ?
80
81         }
82 }
83
84 # this includes ghosts
85 sub each_by_mid {
86         my ($self, $mid, $cols, $cb) = @_;
87         my $dbh = $self->{dbh};
88
89 =over
90         I originally wanted to stuff everything into a single query:
91
92         SELECT over.* FROM over
93         LEFT JOIN id2num ON over.num = id2num.num
94         LEFT JOIN msgid ON msgid.id = id2num.id
95         WHERE msgid.mid = ? AND over.num >= ?
96         ORDER BY over.num ASC
97         LIMIT 1000
98
99         But it's faster broken out (and we're always in a
100         transaction for subroutines in this file)
101 =cut
102
103         my $sth = $dbh->prepare_cached(<<'', undef, 1);
104 SELECT id FROM msgid WHERE mid = ? LIMIT 1
105
106         $sth->execute($mid);
107         my $id = $sth->fetchrow_array;
108         defined $id or return;
109
110         push(@$cols, 'num');
111         $cols = join(',', map { $_ } @$cols);
112         my $lim = 10;
113         my $prev = get_counter($dbh, 'ghost');
114         while (1) {
115                 $sth = $dbh->prepare_cached(<<"", undef, 1);
116 SELECT num FROM id2num WHERE id = ? AND num >= ?
117 ORDER BY num ASC
118 LIMIT $lim
119
120                 $sth->execute($id, $prev);
121                 my $nums = $sth->fetchall_arrayref;
122                 my $nr = scalar(@$nums) or return;
123                 $prev = $nums->[-1]->[0];
124
125                 $sth = $dbh->prepare_cached(<<"", undef, 1);
126 SELECT $cols FROM over WHERE over.num = ? LIMIT 1
127
128                 foreach (@$nums) {
129                         $sth->execute($_->[0]);
130                         my $smsg = $sth->fetchrow_hashref;
131                         $cb->(PublicInbox::Over::load_from_row($smsg)) or
132                                 return;
133                 }
134                 return if $nr != $lim;
135         }
136 }
137
138 # this will create a ghost as necessary
139 sub resolve_mid_to_tid {
140         my ($self, $mid) = @_;
141         my $tid;
142         each_by_mid($self, $mid, ['tid'], sub {
143                 my ($smsg) = @_;
144                 my $cur_tid = $smsg->{tid};
145                 if (defined $tid) {
146                         merge_threads($self, $tid, $cur_tid);
147                 } else {
148                         $tid = $cur_tid;
149                 }
150                 1;
151         });
152         defined $tid ? $tid : create_ghost($self, $mid);
153 }
154
155 sub create_ghost {
156         my ($self, $mid) = @_;
157         my $id = $self->mid2id($mid);
158         my $num = $self->next_ghost_num;
159         $num < 0 or die "ghost num is non-negative: $num\n";
160         my $tid = $self->next_tid;
161         my $dbh = $self->{dbh};
162         $dbh->prepare_cached(<<'')->execute($num, $tid);
163 INSERT INTO over (num, tid) VALUES (?,?)
164
165         $dbh->prepare_cached(<<'')->execute($id, $num);
166 INSERT INTO id2num (id, num) VALUES (?,?)
167
168         $tid;
169 }
170
171 sub merge_threads {
172         my ($self, $winner_tid, $loser_tid) = @_;
173         return if $winner_tid == $loser_tid;
174         my $dbh = $self->{dbh};
175         $dbh->prepare_cached(<<'')->execute($winner_tid, $loser_tid);
176 UPDATE over SET tid = ? WHERE tid = ?
177
178 }
179
180 sub link_refs {
181         my ($self, $refs, $old_tid) = @_;
182         my $tid;
183
184         if (@$refs) {
185                 # first ref *should* be the thread root,
186                 # but we can never trust clients to do the right thing
187                 my $ref = $refs->[0];
188                 $tid = resolve_mid_to_tid($self, $ref);
189                 merge_threads($self, $tid, $old_tid) if defined $old_tid;
190
191                 # the rest of the refs should point to this tid:
192                 foreach my $i (1..$#$refs) {
193                         $ref = $refs->[$i];
194                         my $ptid = resolve_mid_to_tid($self, $ref);
195                         merge_threads($self, $tid, $ptid);
196                 }
197         } else {
198                 $tid = defined $old_tid ? $old_tid : $self->next_tid;
199         }
200         $tid;
201 }
202
203 sub add_over {
204         my ($self, $values) = @_;
205         my ($ts, $num, $mids, $refs, $xpath, $ddd) = @$values;
206         my $old_tid;
207         my $vivified = 0;
208
209         $self->begin_lazy;
210         $self->delete_by_num($num);
211         foreach my $mid (@$mids) {
212                 my $v = 0;
213                 each_by_mid($self, $mid, ['tid'], sub {
214                         my ($cur) = @_;
215                         my $cur_tid = $cur->{tid};
216                         my $n = $cur->{num};
217                         die "num must not be zero for $mid" if !$n;
218                         $old_tid = $cur_tid unless defined $old_tid;
219                         if ($n > 0) { # regular mail
220                                 merge_threads($self, $old_tid, $cur_tid);
221                         } elsif ($n < 0) { # ghost
222                                 link_refs($self, $refs, $old_tid);
223                                 $self->delete_by_num($n);
224                                 $v++;
225                         }
226                         1;
227                 });
228                 $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n";
229                 $vivified += $v;
230         }
231         my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid);
232         my $sid = $self->sid($xpath);
233         my $dbh = $self->{dbh};
234         my $sth = $dbh->prepare_cached(<<'');
235 INSERT INTO over (num, tid, sid, ts, ddd)
236 VALUES (?,?,?,?,?)
237
238         my $n = 0;
239         my @v = ($num, $tid, $sid, $ts);
240         foreach (@v) { $sth->bind_param(++$n, $_) }
241         $sth->bind_param(++$n, $ddd, SQL_BLOB);
242         $sth->execute;
243         $sth = $dbh->prepare_cached(<<'');
244 INSERT INTO id2num (id, num) VALUES (?,?)
245
246         foreach my $mid (@$mids) {
247                 my $id = $self->mid2id($mid);
248                 $sth->execute($id, $num);
249         }
250 }
251
252 sub delete_articles {
253         my ($self, $nums) = @_;
254         my $dbh = $self->connect;
255         $self->delete_by_num($_) foreach @$nums;
256 }
257
258 sub remove_oid {
259         my ($self, $oid, $mid) = @_;
260         $self->begin_lazy;
261         each_by_mid($self, $mid, ['ddd'], sub {
262                 my ($smsg) = @_;
263                 $self->delete_by_num($smsg->{num}) if $smsg->{blob} eq $oid;
264                 1;
265         });
266 }
267
268 sub create_tables {
269         my ($dbh) = @_;
270
271         $dbh->do(<<'');
272 CREATE TABLE IF NOT EXISTS over (
273         num INTEGER NOT NULL,
274         tid INTEGER NOT NULL,
275         sid INTEGER,
276         ts INTEGER,
277         ddd VARBINARY, /* doc-data-deflated */
278         UNIQUE (num)
279 )
280
281         $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)');
282         $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)');
283         $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)');
284
285         $dbh->do(<<'');
286 CREATE TABLE IF NOT EXISTS counter (
287         key VARCHAR(8) PRIMARY KEY NOT NULL,
288         val INTEGER DEFAULT 0,
289         UNIQUE (key)
290 )
291
292         $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('thread')");
293         $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('ghost')");
294
295         $dbh->do(<<'');
296 CREATE TABLE IF NOT EXISTS subject (
297         sid INTEGER PRIMARY KEY AUTOINCREMENT,
298         path VARCHAR(40) NOT NULL,
299         UNIQUE (path)
300 )
301
302         $dbh->do(<<'');
303 CREATE TABLE IF NOT EXISTS id2num (
304         id INTEGER NOT NULL,
305         num INTEGER NOT NULL,
306         UNIQUE (id, num)
307 )
308
309         # performance critical:
310         $dbh->do('CREATE INDEX IF NOT EXISTS idx_inum ON id2num (num)');
311         $dbh->do('CREATE INDEX IF NOT EXISTS idx_id ON id2num (id)');
312
313         $dbh->do(<<'');
314 CREATE TABLE IF NOT EXISTS msgid (
315         id INTEGER PRIMARY KEY AUTOINCREMENT,
316         mid VARCHAR(244) NOT NULL,
317         UNIQUE (mid)
318 )
319
320 }
321
322 sub commit_lazy {
323         my ($self) = @_;
324         delete $self->{txn} or return;
325         $self->{dbh}->commit;
326 }
327
328 sub begin_lazy {
329         my ($self) = @_;
330         return if $self->{txn};
331         my $dbh = $self->connect or return;
332         $dbh->begin_work;
333         # $dbh->{Profile} = 2;
334         $self->{txn} = 1;
335 }
336
337 sub rollback_lazy {
338         my ($self) = @_;
339         delete $self->{txn} or return;
340         $self->{dbh}->rollback;
341 }
342
343 sub disconnect {
344         my ($self) = @_;
345         die "in transaction" if $self->{txn};
346         $self->{dbh} = undef;
347 }
348
349 sub create {
350         my ($self) = @_;
351         unless (-r $self->{filename}) {
352                 require File::Path;
353                 require File::Basename;
354                 File::Path::mkpath(File::Basename::dirname($self->{filename}));
355         }
356         # create the DB:
357         PublicInbox::Over::connect($self);
358         $self->disconnect;
359 }
360
361 1;