]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/OverIdx.pm
search: simplify unindexing
[public-inbox.git] / lib / PublicInbox / OverIdx.pm
1 # Copyright (C) 2018-2020 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
5 # Unlike Msgmap, this is an _UNSTABLE_ cache which can be
6 # tweaked/updated over time and rebuilt.
7 #
8 # Ghost messages (messages which are only referenced in References/In-Reply-To)
9 # are denoted by a negative NNTP article number.
10 package PublicInbox::OverIdx;
11 use strict;
12 use warnings;
13 use base qw(PublicInbox::Over);
14 use IO::Handle;
15 use DBI;
16 use PublicInbox::MID qw/id_compress mids_for_index references/;
17 use PublicInbox::Smsg qw(subject_normalized);
18 use Compress::Zlib qw(compress);
19 use PublicInbox::Search;
20
21 sub dbh_new {
22         my ($self) = @_;
23         my $dbh = $self->SUPER::dbh_new(1);
24
25         # TRUNCATE reduces I/O compared to the default (DELETE)
26         # We do not use WAL since we're optimized for read-only ops,
27         # (and read-only requires SQLite 3.22.0 (2018-01-22)).
28         $dbh->do('PRAGMA journal_mode = TRUNCATE');
29
30         # 80000 pages (80MiB on SQLite <3.12.0, 320MiB on 3.12.0+)
31         # was found to be good in 2018 during the large LKML import
32         # at the time.  This ought to be configurable based on HW
33         # and inbox size; I suspect it's overkill for many inboxes.
34         $dbh->do('PRAGMA cache_size = 80000');
35
36         create_tables($dbh);
37         $dbh;
38 }
39
40 sub get_counter ($$) {
41         my ($dbh, $key) = @_;
42         my $sth = $dbh->prepare_cached(<<'', undef, 1);
43 SELECT val FROM counter WHERE key = ? LIMIT 1
44
45         $sth->execute($key);
46         $sth->fetchrow_array;
47 }
48
49 sub adj_counter ($$$) {
50         my ($self, $key, $op) = @_;
51         my $dbh = $self->{dbh};
52         my $sth = $dbh->prepare_cached(<<"");
53 UPDATE counter SET val = val $op 1 WHERE key = ?
54
55         $sth->execute($key);
56
57         get_counter($dbh, $key);
58 }
59
60 sub next_tid { adj_counter($_[0], 'thread', '+') }
61 sub next_ghost_num { adj_counter($_[0], 'ghost', '-') }
62
63 sub id_for ($$$$$) {
64         my ($self, $tbl, $id_col, $val_col, $val) = @_;
65         my $dbh = $self->{dbh};
66         my $in = $dbh->prepare_cached(<<"")->execute($val);
67 INSERT OR IGNORE INTO $tbl ($val_col) VALUES (?)
68
69         if ($in == 0) {
70                 my $sth = $dbh->prepare_cached(<<"", undef, 1);
71 SELECT $id_col FROM $tbl WHERE $val_col = ? LIMIT 1
72
73                 $sth->execute($val);
74                 $sth->fetchrow_array;
75         } else {
76                 $dbh->last_insert_id(undef, undef, $tbl, $id_col);
77         }
78 }
79
80 sub sid {
81         my ($self, $path) = @_;
82         return unless defined $path && $path ne '';
83         id_for($self, 'subject', 'sid', 'path' => $path);
84 }
85
86 sub mid2id {
87         my ($self, $mid) = @_;
88         id_for($self, 'msgid', 'id', 'mid' => $mid);
89 }
90
91 sub delete_by_num {
92         my ($self, $num, $tid_ref) = @_;
93         my $dbh = $self->{dbh};
94         if ($tid_ref) {
95                 my $sth = $dbh->prepare_cached(<<'', undef, 1);
96 SELECT tid FROM over WHERE num = ? LIMIT 1
97
98                 $sth->execute($num);
99                 $$tid_ref = $sth->fetchrow_array; # may be undef
100         }
101         foreach (qw(over id2num)) {
102                 $dbh->prepare_cached(<<"")->execute($num);
103 DELETE FROM $_ WHERE num = ?
104
105         }
106 }
107
108 # this includes ghosts
109 sub each_by_mid {
110         my ($self, $mid, $cols, $cb, @arg) = @_;
111         my $dbh = $self->{dbh};
112
113 =over
114         I originally wanted to stuff everything into a single query:
115
116         SELECT over.* FROM over
117         LEFT JOIN id2num ON over.num = id2num.num
118         LEFT JOIN msgid ON msgid.id = id2num.id
119         WHERE msgid.mid = ? AND over.num >= ?
120         ORDER BY over.num ASC
121         LIMIT 1000
122
123         But it's faster broken out (and we're always in a
124         transaction for subroutines in this file)
125 =cut
126
127         my $sth = $dbh->prepare_cached(<<'', undef, 1);
128 SELECT id FROM msgid WHERE mid = ? LIMIT 1
129
130         $sth->execute($mid);
131         my $id = $sth->fetchrow_array;
132         defined $id or return;
133
134         push(@$cols, 'num');
135         $cols = join(',', map { $_ } @$cols);
136         my $lim = 10;
137         my $prev = get_counter($dbh, 'ghost');
138         while (1) {
139                 $sth = $dbh->prepare_cached(<<"", undef, 1);
140 SELECT num FROM id2num WHERE id = ? AND num >= ?
141 ORDER BY num ASC
142 LIMIT $lim
143
144                 $sth->execute($id, $prev);
145                 my $nums = $sth->fetchall_arrayref;
146                 my $nr = scalar(@$nums) or return;
147                 $prev = $nums->[-1]->[0];
148
149                 $sth = $dbh->prepare_cached(<<"", undef, 1);
150 SELECT $cols FROM over WHERE over.num = ? LIMIT 1
151
152                 foreach (@$nums) {
153                         $sth->execute($_->[0]);
154                         my $smsg = $sth->fetchrow_hashref;
155                         $smsg = PublicInbox::Over::load_from_row($smsg);
156                         $cb->($self, $smsg, @arg) or return;
157                 }
158                 return if $nr != $lim;
159         }
160 }
161
162 sub _resolve_mid_to_tid {
163         my ($self, $smsg, $tid) = @_;
164         my $cur_tid = $smsg->{tid};
165         if (defined $$tid) {
166                 merge_threads($self, $$tid, $cur_tid);
167         } else {
168                 $$tid = $cur_tid;
169         }
170         1;
171 }
172
173 # this will create a ghost as necessary
174 sub resolve_mid_to_tid {
175         my ($self, $mid) = @_;
176         my $tid;
177         each_by_mid($self, $mid, ['tid'], \&_resolve_mid_to_tid, \$tid);
178         defined $tid ? $tid : create_ghost($self, $mid);
179 }
180
181 sub create_ghost {
182         my ($self, $mid) = @_;
183         my $id = mid2id($self, $mid);
184         my $num = next_ghost_num($self);
185         $num < 0 or die "ghost num is non-negative: $num\n";
186         my $tid = next_tid($self);
187         my $dbh = $self->{dbh};
188         $dbh->prepare_cached(<<'')->execute($num, $tid);
189 INSERT INTO over (num, tid) VALUES (?,?)
190
191         $dbh->prepare_cached(<<'')->execute($id, $num);
192 INSERT INTO id2num (id, num) VALUES (?,?)
193
194         $tid;
195 }
196
197 sub merge_threads {
198         my ($self, $winner_tid, $loser_tid) = @_;
199         return if $winner_tid == $loser_tid;
200         my $dbh = $self->{dbh};
201         $dbh->prepare_cached(<<'')->execute($winner_tid, $loser_tid);
202 UPDATE over SET tid = ? WHERE tid = ?
203
204 }
205
206 sub link_refs {
207         my ($self, $refs, $old_tid) = @_;
208         my $tid;
209
210         if (@$refs) {
211                 # first ref *should* be the thread root,
212                 # but we can never trust clients to do the right thing
213                 my $ref = $refs->[0];
214                 $tid = resolve_mid_to_tid($self, $ref);
215                 merge_threads($self, $tid, $old_tid) if defined $old_tid;
216
217                 # the rest of the refs should point to this tid:
218                 foreach my $i (1..$#$refs) {
219                         $ref = $refs->[$i];
220                         my $ptid = resolve_mid_to_tid($self, $ref);
221                         merge_threads($self, $tid, $ptid);
222                 }
223         } else {
224                 $tid = defined $old_tid ? $old_tid : next_tid($self);
225         }
226         $tid;
227 }
228
229 sub parse_references ($$$) {
230         my ($smsg, $hdr, $mids) = @_;
231         my $refs = references($hdr);
232         push(@$refs, @$mids) if scalar(@$mids) > 1;
233         return $refs if scalar(@$refs) == 0;
234
235         # prevent circular references here:
236         my %seen = ( $smsg->{mid} => 1 );
237         my @keep;
238         foreach my $ref (@$refs) {
239                 if (length($ref) > PublicInbox::MID::MAX_MID_SIZE) {
240                         warn "References: <$ref> too long, ignoring\n";
241                         next;
242                 }
243                 push(@keep, $ref) unless $seen{$ref}++;
244         }
245         $smsg->{references} = '<'.join('> <', @keep).'>' if @keep;
246         \@keep;
247 }
248
249 # normalize subjects so they are suitable as pathnames for URLs
250 # XXX: consider for removal
251 sub subject_path ($) {
252         my ($subj) = @_;
253         $subj = subject_normalized($subj);
254         $subj =~ s![^a-zA-Z0-9_\.~/\-]+!_!g;
255         lc($subj);
256 }
257
258 sub add_overview {
259         my ($self, $mime, $smsg) = @_;
260         $smsg->{lines} = $mime->body_raw =~ tr!\n!\n!;
261         my $hdr = $mime->header_obj;
262         my $mids = mids_for_index($hdr);
263         my $refs = parse_references($smsg, $hdr, $mids);
264         my $subj = $smsg->{subject};
265         my $xpath;
266         if ($subj ne '') {
267                 $xpath = subject_path($subj);
268                 $xpath = id_compress($xpath);
269         }
270         my $dd = $smsg->to_doc_data;
271         utf8::encode($dd);
272         $dd = compress($dd);
273         add_over($self, [ @$smsg{qw(ts ds num)}, $mids, $refs, $xpath, $dd ]);
274 }
275
276 sub _add_over {
277         my ($self, $smsg, $mid, $refs, $old_tid, $v) = @_;
278         my $cur_tid = $smsg->{tid};
279         my $n = $smsg->{num};
280         die "num must not be zero for $mid" if !$n;
281         $$old_tid = $cur_tid unless defined $$old_tid;
282         if ($n > 0) { # regular mail
283                 merge_threads($self, $$old_tid, $cur_tid);
284         } elsif ($n < 0) { # ghost
285                 link_refs($self, $refs, $$old_tid);
286                 delete_by_num($self, $n);
287                 $$v++;
288         }
289         1;
290 }
291
292 sub add_over {
293         my ($self, $values) = @_;
294         my ($ts, $ds, $num, $mids, $refs, $xpath, $ddd) = @$values;
295         my $old_tid;
296         my $vivified = 0;
297
298         begin_lazy($self);
299         delete_by_num($self, $num, \$old_tid);
300         foreach my $mid (@$mids) {
301                 my $v = 0;
302                 each_by_mid($self, $mid, ['tid'], \&_add_over,
303                                 $mid, $refs, \$old_tid, \$v);
304                 $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n";
305                 $vivified += $v;
306         }
307         my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid);
308         my $sid = sid($self, $xpath);
309         my $dbh = $self->{dbh};
310         my $sth = $dbh->prepare_cached(<<'');
311 INSERT INTO over (num, tid, sid, ts, ds, ddd)
312 VALUES (?,?,?,?,?,?)
313
314         my $n = 0;
315         my @v = ($num, $tid, $sid, $ts, $ds);
316         foreach (@v) { $sth->bind_param(++$n, $_) }
317         $sth->bind_param(++$n, $ddd);
318         $sth->execute;
319         $sth = $dbh->prepare_cached(<<'');
320 INSERT INTO id2num (id, num) VALUES (?,?)
321
322         foreach my $mid (@$mids) {
323                 my $id = mid2id($self, $mid);
324                 $sth->execute($id, $num);
325         }
326 }
327
328 sub _remove_oid {
329         my ($self, $smsg, $oid, $removed) = @_;
330         if (!defined($oid) || $smsg->{blob} eq $oid) {
331                 delete_by_num($self, $smsg->{num});
332                 push @$removed, $smsg->{num};
333         }
334         1;
335 }
336
337 # returns number of removed messages in scalar context,
338 # array of removed article numbers in array context.
339 # $oid may be undef to match only on $mid
340 sub remove_oid {
341         my ($self, $oid, $mid) = @_;
342         my $removed = [];
343         begin_lazy($self);
344         each_by_mid($self, $mid, ['ddd'], \&_remove_oid, $oid, $removed);
345         @$removed;
346 }
347
348 sub _num_mid0_for_oid {
349         my ($self, $smsg, $oid, $res) = @_;
350         my $blob = $smsg->{blob};
351         return 1 if (!defined($blob) || $blob ne $oid); # continue;
352         @$res = ($smsg->{num}, $smsg->{mid});
353         0; # done
354 }
355
356 sub num_mid0_for_oid {
357         my ($self, $oid, $mid) = @_;
358         my $res = [];
359         begin_lazy($self);
360         each_by_mid($self, $mid, ['ddd'], \&_num_mid0_for_oid, $oid, $res);
361         @$res, # ($num, $mid0);
362 }
363
364 sub create_tables {
365         my ($dbh) = @_;
366
367         $dbh->do(<<'');
368 CREATE TABLE IF NOT EXISTS over (
369         num INTEGER NOT NULL,
370         tid INTEGER NOT NULL,
371         sid INTEGER,
372         ts INTEGER,
373         ds INTEGER,
374         ddd VARBINARY, /* doc-data-deflated */
375         UNIQUE (num)
376 )
377
378         $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)');
379         $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)');
380         $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)');
381         $dbh->do('CREATE INDEX IF NOT EXISTS idx_ds ON over (ds)');
382
383         $dbh->do(<<'');
384 CREATE TABLE IF NOT EXISTS counter (
385         key VARCHAR(8) PRIMARY KEY NOT NULL,
386         val INTEGER DEFAULT 0,
387         UNIQUE (key)
388 )
389
390         $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('thread')");
391         $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('ghost')");
392
393         $dbh->do(<<'');
394 CREATE TABLE IF NOT EXISTS subject (
395         sid INTEGER PRIMARY KEY AUTOINCREMENT,
396         path VARCHAR(40) NOT NULL,
397         UNIQUE (path)
398 )
399
400         $dbh->do(<<'');
401 CREATE TABLE IF NOT EXISTS id2num (
402         id INTEGER NOT NULL,
403         num INTEGER NOT NULL,
404         UNIQUE (id, num)
405 )
406
407         # performance critical:
408         $dbh->do('CREATE INDEX IF NOT EXISTS idx_inum ON id2num (num)');
409         $dbh->do('CREATE INDEX IF NOT EXISTS idx_id ON id2num (id)');
410
411         $dbh->do(<<'');
412 CREATE TABLE IF NOT EXISTS msgid (
413         id INTEGER PRIMARY KEY AUTOINCREMENT,
414         mid VARCHAR(244) NOT NULL,
415         UNIQUE (mid)
416 )
417
418 }
419
420 sub commit_lazy {
421         my ($self) = @_;
422         delete $self->{txn} or return;
423         $self->{dbh}->commit;
424 }
425
426 sub begin_lazy {
427         my ($self) = @_;
428         return if $self->{txn};
429         my $dbh = $self->connect or return;
430         $dbh->begin_work;
431         # $dbh->{Profile} = 2;
432         $self->{txn} = 1;
433 }
434
435 sub rollback_lazy {
436         my ($self) = @_;
437         delete $self->{txn} or return;
438         $self->{dbh}->rollback;
439 }
440
441 sub disconnect {
442         my ($self) = @_;
443         die "in transaction" if $self->{txn};
444         $self->SUPER::disconnect;
445 }
446
447 sub create {
448         my ($self) = @_;
449         unless (-r $self->{filename}) {
450                 require File::Path;
451                 require File::Basename;
452                 File::Path::mkpath(File::Basename::dirname($self->{filename}));
453         }
454         # create the DB:
455         PublicInbox::Over::connect($self);
456         $self->disconnect;
457 }
458
459 1;