X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FOverIdx.pm;h=6f0477f0a7150587e4b64a0bf0470736b8caba75;hb=7f4a11a6499aade26b418b7ae13cd12859f75bdd;hp=29c6e0b933aae2e757f6307ec3758c53fc1dec17;hpb=406b9eb2906518fb8c3999f2a826eeae86fa3d30;p=public-inbox.git diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm index 29c6e0b9..6f0477f0 100644 --- a/lib/PublicInbox/OverIdx.pm +++ b/lib/PublicInbox/OverIdx.pm @@ -9,23 +9,18 @@ # are denoted by a negative NNTP article number. package PublicInbox::OverIdx; use strict; -use warnings; -use base qw(PublicInbox::Over); +use v5.10.1; +use parent qw(PublicInbox::Over); use IO::Handle; -use DBI; +use DBI qw(:sql_types); # SQL_BLOB use PublicInbox::MID qw/id_compress mids_for_index references/; use PublicInbox::Smsg qw(subject_normalized); use Compress::Zlib qw(compress); -use PublicInbox::Search; +use Carp qw(croak); sub dbh_new { my ($self) = @_; - my $dbh = $self->SUPER::dbh_new(1); - - # TRUNCATE reduces I/O compared to the default (DELETE) - # We do not use WAL since we're optimized for read-only ops, - # (and read-only requires SQLite 3.22.0 (2018-01-22)). - $dbh->do('PRAGMA journal_mode = TRUNCATE'); + my $dbh = $self->SUPER::dbh_new($self->{-no_fsync} ? 2 : 1); # 80000 pages (80MiB on SQLite <3.12.0, 320MiB on 3.12.0+) # was found to be good in 2018 during the large LKML import @@ -37,6 +32,13 @@ sub dbh_new { $dbh; } +sub new { + my ($class, $f) = @_; + my $self = $class->SUPER::new($f); + $self->{min_tid} = 0; + $self; +} + sub get_counter ($$) { my ($dbh, $key) = @_; my $sth = $dbh->prepare_cached(<<'', undef, 1); @@ -164,8 +166,12 @@ sub _resolve_mid_to_tid { my $cur_tid = $smsg->{tid}; if (defined $$tid) { merge_threads($self, $$tid, $cur_tid); - } else { + } elsif ($cur_tid > $self->{min_tid}) { $$tid = $cur_tid; + } else { # rethreading, queue up dead ghosts + $$tid = next_tid($self); + my $num = $smsg->{num}; + push(@{$self->{-ghosts_to_delete}}, $num) if $num < 0; } 1; } @@ -175,23 +181,23 @@ sub resolve_mid_to_tid { my ($self, $mid) = @_; my $tid; each_by_mid($self, $mid, ['tid'], \&_resolve_mid_to_tid, \$tid); - defined $tid ? $tid : create_ghost($self, $mid); -} - -sub create_ghost { - my ($self, $mid) = @_; - my $id = mid2id($self, $mid); - my $num = next_ghost_num($self); - $num < 0 or die "ghost num is non-negative: $num\n"; - my $tid = next_tid($self); - my $dbh = $self->{dbh}; - $dbh->prepare_cached(<<'')->execute($num, $tid); + if (my $del = delete $self->{-ghosts_to_delete}) { + delete_by_num($self, $_) for @$del; + } + $tid // do { # create a new ghost + my $id = mid2id($self, $mid); + my $num = next_ghost_num($self); + $num < 0 or die "ghost num is non-negative: $num\n"; + $tid = next_tid($self); + my $dbh = $self->{dbh}; + $dbh->prepare_cached(<<'')->execute($num, $tid); INSERT INTO over (num, tid) VALUES (?,?) - $dbh->prepare_cached(<<'')->execute($id, $num); + $dbh->prepare_cached(<<'')->execute($id, $num); INSERT INTO id2num (id, num) VALUES (?,?) - $tid; + $tid; + }; } sub merge_threads { @@ -221,7 +227,7 @@ sub link_refs { merge_threads($self, $tid, $ptid); } } else { - $tid = defined $old_tid ? $old_tid : next_tid($self); + $tid = $old_tid // next_tid($self); } $tid; } @@ -256,11 +262,10 @@ sub subject_path ($) { } sub add_overview { - my ($self, $mime, $smsg) = @_; - $smsg->{lines} = $mime->body_raw =~ tr!\n!\n!; - my $hdr = $mime->header_obj; - my $mids = mids_for_index($hdr); - my $refs = parse_references($smsg, $hdr, $mids); + my ($self, $eml, $smsg) = @_; + $smsg->{lines} = $eml->body_raw =~ tr!\n!\n!; + my $mids = mids_for_index($eml); + my $refs = parse_references($smsg, $eml, $mids); my $subj = $smsg->{subject}; my $xpath; if ($subj ne '') { @@ -270,7 +275,7 @@ sub add_overview { my $dd = $smsg->to_doc_data; utf8::encode($dd); $dd = compress($dd); - add_over($self, [ @$smsg{qw(ts ds num)}, $mids, $refs, $xpath, $dd ]); + add_over($self, $smsg, $mids, $refs, $xpath, $dd); } sub _add_over { @@ -278,10 +283,17 @@ sub _add_over { my $cur_tid = $smsg->{tid}; my $n = $smsg->{num}; die "num must not be zero for $mid" if !$n; - $$old_tid = $cur_tid unless defined $$old_tid; + my $cur_valid = $cur_tid > $self->{min_tid}; + if ($n > 0) { # regular mail - merge_threads($self, $$old_tid, $cur_tid); + if ($cur_valid) { + $$old_tid //= $cur_tid; + merge_threads($self, $$old_tid, $cur_tid); + } else { + $$old_tid //= next_tid($self); + } } elsif ($n < 0) { # ghost + $$old_tid //= $cur_valid ? $cur_tid : next_tid($self); link_refs($self, $refs, $$old_tid); delete_by_num($self, $n); $$v++; @@ -290,13 +302,14 @@ sub _add_over { } sub add_over { - my ($self, $values) = @_; - my ($ts, $ds, $num, $mids, $refs, $xpath, $ddd) = @$values; + my ($self, $smsg, $mids, $refs, $xpath, $ddd) = @_; my $old_tid; my $vivified = 0; + my $num = $smsg->{num}; begin_lazy($self); delete_by_num($self, $num, \$old_tid); + $old_tid = undef if ($old_tid // 0) <= $self->{min_tid}; foreach my $mid (@$mids) { my $v = 0; each_by_mid($self, $mid, ['tid'], \&_add_over, @@ -304,17 +317,17 @@ sub add_over { $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n"; $vivified += $v; } - my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid); - my $sid = sid($self, $xpath); + $smsg->{tid} = $vivified ? $old_tid : link_refs($self, $refs, $old_tid); + $smsg->{sid} = sid($self, $xpath); my $dbh = $self->{dbh}; my $sth = $dbh->prepare_cached(<<''); INSERT INTO over (num, tid, sid, ts, ds, ddd) VALUES (?,?,?,?,?,?) - my $n = 0; - my @v = ($num, $tid, $sid, $ts, $ds); - foreach (@v) { $sth->bind_param(++$n, $_) } - $sth->bind_param(++$n, $ddd); + my $nc = 1; + $sth->bind_param($nc, $num); + $sth->bind_param(++$nc, $smsg->{$_}) for (qw(tid sid ts ds)); + $sth->bind_param(++$nc, $ddd, SQL_BLOB); $sth->execute; $sth = $dbh->prepare_cached(<<''); INSERT INTO id2num (id, num) VALUES (?,?) @@ -326,22 +339,23 @@ INSERT INTO id2num (id, num) VALUES (?,?) } sub _remove_oid { - my ($self, $smsg, $oid, $nr) = @_; + my ($self, $smsg, $oid, $removed) = @_; if (!defined($oid) || $smsg->{blob} eq $oid) { delete_by_num($self, $smsg->{num}); - $$nr++; + push @$removed, $smsg->{num}; } 1; } -# returns number of removed messages +# returns number of removed messages in scalar context, +# array of removed article numbers in array context. # $oid may be undef to match only on $mid sub remove_oid { my ($self, $oid, $mid) = @_; - my $nr = 0; + my $removed = []; begin_lazy($self); - each_by_mid($self, $mid, ['ddd'], \&_remove_oid, $oid, \$nr); - $nr; + each_by_mid($self, $mid, ['ddd'], \&_remove_oid, $oid, $removed); + @$removed; } sub _num_mid0_for_oid { @@ -425,7 +439,7 @@ sub commit_lazy { sub begin_lazy { my ($self) = @_; return if $self->{txn}; - my $dbh = $self->connect or return; + my $dbh = $self->dbh or return; $dbh->begin_work; # $dbh->{Profile} = 2; $self->{txn} = 1; @@ -437,10 +451,10 @@ sub rollback_lazy { $self->{dbh}->rollback; } -sub disconnect { +sub dbh_close { my ($self) = @_; die "in transaction" if $self->{txn}; - $self->SUPER::disconnect; + $self->SUPER::dbh_close; } sub create { @@ -451,8 +465,51 @@ sub create { File::Path::mkpath(File::Basename::dirname($self->{filename})); } # create the DB: - PublicInbox::Over::connect($self); - $self->disconnect; + PublicInbox::Over::dbh($self); + $self->dbh_close; +} + +sub rethread_prepare { + my ($self, $opt) = @_; + return unless $opt->{rethread}; + begin_lazy($self); + my $min = $self->{min_tid} = get_counter($self->{dbh}, 'thread') // 0; + my $pr = $opt->{-progress}; + $pr->("rethread min THREADID ".($min + 1)."\n") if $pr && $min; +} + +sub rethread_done { + my ($self, $opt) = @_; + return unless $opt->{rethread} && $self->{txn}; + defined(my $min = $self->{min_tid}) or croak('BUG: no min_tid'); + my $dbh = $self->{dbh} or croak('BUG: no dbh'); + my $rows = $dbh->selectall_arrayref(<<'', { Slice => {} }, $min); +SELECT num,tid FROM over WHERE num < 0 AND tid < ? + + my $show_id = $dbh->prepare('SELECT id FROM id2num WHERE num = ?'); + my $show_mid = $dbh->prepare('SELECT mid FROM msgid WHERE id = ?'); + my $pr = $opt->{-progress}; + my $total = 0; + for my $r (@$rows) { + my $exp = 0; + $show_id->execute($r->{num}); + while (defined(my $id = $show_id->fetchrow_array)) { + ++$exp; + $show_mid->execute($id); + my $mid = $show_mid->fetchrow_array; + if (!defined($mid)) { + warn <{num} ID=$id THREADID=$r->{tid} has no Message-ID +EOF + next; + } + $pr->(<{num} <$mid> THREADID=$r->{tid} culled +EOM + } + delete_by_num($self, $r->{num}); + } + $pr->("I: rethread culled $total ghosts\n") if $pr && $total; } 1;