X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FOverIdx.pm;h=9f4a56fbd4861b9978a3a3146317a42d597a95f1;hb=5f6a0d2342323541e44ff2f1e7329053d0263800;hp=0e43aabc044f61d0f16153b70f554cbc11159fc9;hpb=35ff6bb106909b1c1232666a9792156dfa398ea8;p=public-inbox.git diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm index 0e43aabc..9f4a56fb 100644 --- a/lib/PublicInbox/OverIdx.pm +++ b/lib/PublicInbox/OverIdx.pm @@ -1,32 +1,43 @@ -# Copyright (C) 2018 all contributors +# Copyright (C) 2018-2020 all contributors # License: AGPL-3.0+ # for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI -# Unlike Msgmap, this is an _UNSTABLE_ database which can be +# Unlike Msgmap, this is an _UNSTABLE_ cache which can be # tweaked/updated over time and rebuilt. +# +# Ghost messages (messages which are only referenced in References/In-Reply-To) +# are denoted by a negative NNTP article number. package PublicInbox::OverIdx; use strict; use warnings; use base qw(PublicInbox::Over); use IO::Handle; use DBI qw(:sql_types); # SQL_BLOB +use PublicInbox::MID qw/id_compress mids_for_index references/; +use PublicInbox::Smsg qw(subject_normalized); +use Compress::Zlib qw(compress); +use PublicInbox::Search; +use Carp qw(croak); sub dbh_new { my ($self) = @_; - my $dbh = $self->SUPER::dbh_new; - $dbh->do('PRAGMA synchronous = OFF'); # commit_fsync instead - $dbh->do('PRAGMA journal_mode = TRUNCATE'); + my $dbh = $self->SUPER::dbh_new($self->{-no_fsync} ? 2 : 1); + + # 80000 pages (80MiB on SQLite <3.12.0, 320MiB on 3.12.0+) + # was found to be good in 2018 during the large LKML import + # at the time. This ought to be configurable based on HW + # and inbox size; I suspect it's overkill for many inboxes. $dbh->do('PRAGMA cache_size = 80000'); + create_tables($dbh); $dbh; } -sub commit_fsync { - my $fn = $_[0]->{filename}; - if (open my $fh, '+<', $fn) { - $fh->sync; - close $fh; - } +sub new { + my ($class, $f) = @_; + my $self = $class->SUPER::new($f); + $self->{min_tid} = 0; + $self; } sub get_counter ($$) { @@ -81,8 +92,15 @@ sub mid2id { } sub delete_by_num { - my ($self, $num) = @_; + my ($self, $num, $tid_ref) = @_; my $dbh = $self->{dbh}; + if ($tid_ref) { + my $sth = $dbh->prepare_cached(<<'', undef, 1); +SELECT tid FROM over WHERE num = ? LIMIT 1 + + $sth->execute($num); + $$tid_ref = $sth->fetchrow_array; # may be undef + } foreach (qw(over id2num)) { $dbh->prepare_cached(<<"")->execute($num); DELETE FROM $_ WHERE num = ? @@ -92,7 +110,7 @@ DELETE FROM $_ WHERE num = ? # this includes ghosts sub each_by_mid { - my ($self, $mid, $cols, $cb) = @_; + my ($self, $mid, $cols, $cb, @arg) = @_; my $dbh = $self->{dbh}; =over @@ -137,36 +155,45 @@ SELECT $cols FROM over WHERE over.num = ? LIMIT 1 foreach (@$nums) { $sth->execute($_->[0]); my $smsg = $sth->fetchrow_hashref; - $cb->(PublicInbox::Over::load_from_row($smsg)) or - return; + $smsg = PublicInbox::Over::load_from_row($smsg); + $cb->($self, $smsg, @arg) or return; } return if $nr != $lim; } } +sub _resolve_mid_to_tid { + my ($self, $smsg, $tid) = @_; + my $cur_tid = $smsg->{tid}; + if (defined $$tid) { + merge_threads($self, $$tid, $cur_tid); + } elsif ($cur_tid > $self->{min_tid}) { + $$tid = $cur_tid; + } else { # rethreading, queue up dead ghosts + $$tid = next_tid($self); + my $num = $smsg->{num}; + push(@{$self->{-ghosts_to_delete}}, $num) if $num < 0; + } + 1; +} + # this will create a ghost as necessary sub resolve_mid_to_tid { my ($self, $mid) = @_; my $tid; - each_by_mid($self, $mid, ['tid'], sub { - my ($smsg) = @_; - my $cur_tid = $smsg->{tid}; - if (defined $tid) { - merge_threads($self, $tid, $cur_tid); - } else { - $tid = $cur_tid; - } - 1; - }); - defined $tid ? $tid : create_ghost($self, $mid); + each_by_mid($self, $mid, ['tid'], \&_resolve_mid_to_tid, \$tid); + if (my $del = delete $self->{-ghosts_to_delete}) { + delete_by_num($self, $_) for @$del; + } + $tid // create_ghost($self, $mid); } sub create_ghost { my ($self, $mid) = @_; - my $id = $self->mid2id($mid); - my $num = $self->next_ghost_num; + my $id = mid2id($self, $mid); + my $num = next_ghost_num($self); $num < 0 or die "ghost num is non-negative: $num\n"; - my $tid = $self->next_tid; + my $tid = next_tid($self); my $dbh = $self->{dbh}; $dbh->prepare_cached(<<'')->execute($num, $tid); INSERT INTO over (num, tid) VALUES (?,?) @@ -204,74 +231,151 @@ sub link_refs { merge_threads($self, $tid, $ptid); } } else { - $tid = defined $old_tid ? $old_tid : $self->next_tid; + $tid = $old_tid // next_tid($self); } $tid; } +sub parse_references ($$$) { + my ($smsg, $hdr, $mids) = @_; + my $refs = references($hdr); + push(@$refs, @$mids) if scalar(@$mids) > 1; + return $refs if scalar(@$refs) == 0; + + # prevent circular references here: + my %seen = ( $smsg->{mid} => 1 ); + my @keep; + foreach my $ref (@$refs) { + if (length($ref) > PublicInbox::MID::MAX_MID_SIZE) { + warn "References: <$ref> too long, ignoring\n"; + next; + } + push(@keep, $ref) unless $seen{$ref}++; + } + $smsg->{references} = '<'.join('> <', @keep).'>' if @keep; + \@keep; +} + +# normalize subjects so they are suitable as pathnames for URLs +# XXX: consider for removal +sub subject_path ($) { + my ($subj) = @_; + $subj = subject_normalized($subj); + $subj =~ s![^a-zA-Z0-9_\.~/\-]+!_!g; + lc($subj); +} + +sub add_overview { + my ($self, $eml, $smsg) = @_; + $smsg->{lines} = $eml->body_raw =~ tr!\n!\n!; + my $mids = mids_for_index($eml); + my $refs = parse_references($smsg, $eml, $mids); + my $subj = $smsg->{subject}; + my $xpath; + if ($subj ne '') { + $xpath = subject_path($subj); + $xpath = id_compress($xpath); + } + my $dd = $smsg->to_doc_data; + utf8::encode($dd); + $dd = compress($dd); + add_over($self, $smsg, $mids, $refs, $xpath, $dd); +} + +sub _add_over { + my ($self, $smsg, $mid, $refs, $old_tid, $v) = @_; + my $cur_tid = $smsg->{tid}; + my $n = $smsg->{num}; + die "num must not be zero for $mid" if !$n; + my $cur_valid = $cur_tid > $self->{min_tid}; + + if ($n > 0) { # regular mail + if ($cur_valid) { + $$old_tid //= $cur_tid; + merge_threads($self, $$old_tid, $cur_tid); + } else { + $$old_tid //= next_tid($self); + } + } elsif ($n < 0) { # ghost + $$old_tid //= $cur_valid ? $cur_tid : next_tid($self); + link_refs($self, $refs, $$old_tid); + delete_by_num($self, $n); + $$v++; + } + 1; +} + sub add_over { - my ($self, $values) = @_; - my ($ts, $num, $mids, $refs, $xpath, $ddd) = @$values; + my ($self, $smsg, $mids, $refs, $xpath, $ddd) = @_; my $old_tid; my $vivified = 0; + my $num = $smsg->{num}; - $self->begin_lazy; - $self->delete_by_num($num); + begin_lazy($self); + delete_by_num($self, $num, \$old_tid); + $old_tid = undef if ($old_tid // 0) <= $self->{min_tid}; foreach my $mid (@$mids) { my $v = 0; - each_by_mid($self, $mid, ['tid'], sub { - my ($cur) = @_; - my $cur_tid = $cur->{tid}; - my $n = $cur->{num}; - die "num must not be zero for $mid" if !$n; - $old_tid = $cur_tid unless defined $old_tid; - if ($n > 0) { # regular mail - merge_threads($self, $old_tid, $cur_tid); - } elsif ($n < 0) { # ghost - link_refs($self, $refs, $old_tid); - $self->delete_by_num($n); - $v++; - } - 1; - }); + each_by_mid($self, $mid, ['tid'], \&_add_over, + $mid, $refs, \$old_tid, \$v); $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n"; $vivified += $v; } - my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid); - my $sid = $self->sid($xpath); + $smsg->{tid} = $vivified ? $old_tid : link_refs($self, $refs, $old_tid); + $smsg->{sid} = sid($self, $xpath); my $dbh = $self->{dbh}; my $sth = $dbh->prepare_cached(<<''); -INSERT INTO over (num, tid, sid, ts, ddd) -VALUES (?,?,?,?,?) +INSERT INTO over (num, tid, sid, ts, ds, ddd) +VALUES (?,?,?,?,?,?) - my $n = 0; - my @v = ($num, $tid, $sid, $ts); - foreach (@v) { $sth->bind_param(++$n, $_) } - $sth->bind_param(++$n, $ddd, SQL_BLOB); + my $nc = 1; + $sth->bind_param($nc, $num); + $sth->bind_param(++$nc, $smsg->{$_}) for (qw(tid sid ts ds)); + $sth->bind_param(++$nc, $ddd, SQL_BLOB); $sth->execute; $sth = $dbh->prepare_cached(<<''); INSERT INTO id2num (id, num) VALUES (?,?) foreach my $mid (@$mids) { - my $id = $self->mid2id($mid); + my $id = mid2id($self, $mid); $sth->execute($id, $num); } } -sub delete_articles { - my ($self, $nums) = @_; - my $dbh = $self->connect; - $self->delete_by_num($_) foreach @$nums; +sub _remove_oid { + my ($self, $smsg, $oid, $removed) = @_; + if (!defined($oid) || $smsg->{blob} eq $oid) { + delete_by_num($self, $smsg->{num}); + push @$removed, $smsg->{num}; + } + 1; } +# returns number of removed messages in scalar context, +# array of removed article numbers in array context. +# $oid may be undef to match only on $mid sub remove_oid { my ($self, $oid, $mid) = @_; - $self->begin_lazy; - each_by_mid($self, $mid, ['ddd'], sub { - my ($smsg) = @_; - $self->delete_by_num($smsg->{num}) if $smsg->{blob} eq $oid; - 1; - }); + my $removed = []; + begin_lazy($self); + each_by_mid($self, $mid, ['ddd'], \&_remove_oid, $oid, $removed); + @$removed; +} + +sub _num_mid0_for_oid { + my ($self, $smsg, $oid, $res) = @_; + my $blob = $smsg->{blob}; + return 1 if (!defined($blob) || $blob ne $oid); # continue; + @$res = ($smsg->{num}, $smsg->{mid}); + 0; # done +} + +sub num_mid0_for_oid { + my ($self, $oid, $mid) = @_; + my $res = []; + begin_lazy($self); + each_by_mid($self, $mid, ['ddd'], \&_num_mid0_for_oid, $oid, $res); + @$res, # ($num, $mid0); } sub create_tables { @@ -283,6 +387,7 @@ CREATE TABLE IF NOT EXISTS over ( tid INTEGER NOT NULL, sid INTEGER, ts INTEGER, + ds INTEGER, ddd VARBINARY, /* doc-data-deflated */ UNIQUE (num) ) @@ -290,6 +395,7 @@ CREATE TABLE IF NOT EXISTS over ( $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)'); $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)'); $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)'); + $dbh->do('CREATE INDEX IF NOT EXISTS idx_ds ON over (ds)'); $dbh->do(<<''); CREATE TABLE IF NOT EXISTS counter ( @@ -352,7 +458,7 @@ sub rollback_lazy { sub disconnect { my ($self) = @_; die "in transaction" if $self->{txn}; - $self->{dbh} = undef; + $self->SUPER::disconnect; } sub create { @@ -367,4 +473,47 @@ sub create { $self->disconnect; } +sub rethread_prepare { + my ($self, $opt) = @_; + return unless $opt->{rethread}; + begin_lazy($self); + my $min = $self->{min_tid} = get_counter($self->{dbh}, 'thread') // 0; + my $pr = $opt->{-progress}; + $pr->("rethread min THREADID ".($min + 1)."\n") if $pr && $min; +} + +sub rethread_done { + my ($self, $opt) = @_; + return unless $opt->{rethread} && $self->{txn}; + defined(my $min = $self->{min_tid}) or croak('BUG: no min_tid'); + my $dbh = $self->{dbh} or croak('BUG: no dbh'); + my $rows = $dbh->selectall_arrayref(<<'', { Slice => {} }, $min); +SELECT num,tid FROM over WHERE num < 0 AND tid < ? + + my $show_id = $dbh->prepare('SELECT id FROM id2num WHERE num = ?'); + my $show_mid = $dbh->prepare('SELECT mid FROM msgid WHERE id = ?'); + my $pr = $opt->{-progress}; + my $total = 0; + for my $r (@$rows) { + my $exp = 0; + $show_id->execute($r->{num}); + while (defined(my $id = $show_id->fetchrow_array)) { + ++$exp; + $show_mid->execute($id); + my $mid = $show_mid->fetchrow_array; + if (!defined($mid)) { + warn <{num} ID=$id THREADID=$r->{tid} has no Message-ID +EOF + next; + } + $pr->(<{num} <$mid> THREADID=$r->{tid} culled +EOM + } + delete_by_num($self, $r->{num}); + } + $pr->("I: rethread culled $total ghosts\n") if $pr && $total; +} + 1;