]> Sergey Matveev's repositories - public-inbox.git/commitdiff
index: support --rethread switch to fix old indices
authorEric Wong <e@yhbt.net>
Fri, 24 Jul 2020 05:55:47 +0000 (05:55 +0000)
committerEric Wong <e@yhbt.net>
Sat, 25 Jul 2020 20:48:18 +0000 (20:48 +0000)
Older versions of public-inbox < 1.3.0 had subtly
different semantics around threading in some corner
cases.  This switch (when combined with --reindex)
allows us to fix them by regenerating associations.

Documentation/public-inbox-index.pod
lib/PublicInbox/OverIdx.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/V2Writable.pm
script/public-inbox-index
t/v1reindex.t
t/v2reindex.t

index ff2e5486748889667667de1ef0f86d0d6f390115..08f2fbf453228db0ac7bb917d7a9f5642e0a8c6a 100644 (file)
@@ -68,12 +68,25 @@ Xapian database.  Using this with C<--compact> or running
 L<public-inbox-compact(1)> afterwards is recommended to
 release free space.
 
-public-inbox protects writes to various indices with L<flock(2)>,
-so it is safe to reindex while L<public-inbox-watch(1)>,
-L<public-inbox-mda(1)> or L<public-inbox-learn(1)> run.
+public-inbox protects writes to various indices with
+L<flock(2)>, so it is safe to reindex (and rethread) while
+L<public-inbox-watch(1)>, L<public-inbox-mda(1)> or
+L<public-inbox-learn(1)> run.
 
-This does not touch the NNTP article number database or
-affect threading.
+This does not touch the NNTP article number database.
+It does not affect threading unless C<--rethread> is
+used.
+
+=item --rethread
+
+Regenerate internal THREADID and message thread associations
+when reindexing.
+
+This fixes some bugs in older versions of public-inbox.  While
+it is possible to use this without C<--reindex>, it makes little
+sense to do so.
+
+Available in public-inbox 1.6.0 (PENDING).
 
 =item --prune
 
index 5601e602c0181aebfd89d01d541d51fa2ac05b64..c57be7243a59356a3c18691f318207b0e062ca91 100644 (file)
@@ -17,6 +17,7 @@ use PublicInbox::MID qw/id_compress mids_for_index references/;
 use PublicInbox::Smsg qw(subject_normalized);
 use Compress::Zlib qw(compress);
 use PublicInbox::Search;
+use Carp qw(croak);
 
 sub dbh_new {
        my ($self) = @_;
@@ -37,6 +38,13 @@ sub dbh_new {
        $dbh;
 }
 
+sub new {
+       my ($class, $f) = @_;
+       my $self = $class->SUPER::new($f);
+       $self->{min_tid} = 0;
+       $self;
+}
+
 sub get_counter ($$) {
        my ($dbh, $key) = @_;
        my $sth = $dbh->prepare_cached(<<'', undef, 1);
@@ -164,8 +172,12 @@ sub _resolve_mid_to_tid {
        my $cur_tid = $smsg->{tid};
        if (defined $$tid) {
                merge_threads($self, $$tid, $cur_tid);
-       } else {
+       } elsif ($cur_tid > $self->{min_tid}) {
                $$tid = $cur_tid;
+       } else { # rethreading, queue up dead ghosts
+               $$tid = next_tid($self);
+               my $num = $smsg->{num};
+               push(@{$self->{-ghosts_to_delete}}, $num) if $num < 0;
        }
        1;
 }
@@ -175,7 +187,10 @@ sub resolve_mid_to_tid {
        my ($self, $mid) = @_;
        my $tid;
        each_by_mid($self, $mid, ['tid'], \&_resolve_mid_to_tid, \$tid);
-       defined $tid ? $tid : create_ghost($self, $mid);
+       if (my $del = delete $self->{-ghosts_to_delete}) {
+               delete_by_num($self, $_) for @$del;
+       }
+       $tid // create_ghost($self, $mid);
 }
 
 sub create_ghost {
@@ -221,7 +236,7 @@ sub link_refs {
                        merge_threads($self, $tid, $ptid);
                }
        } else {
-               $tid = defined $old_tid ? $old_tid : next_tid($self);
+               $tid = $old_tid // next_tid($self);
        }
        $tid;
 }
@@ -278,10 +293,17 @@ sub _add_over {
        my $cur_tid = $smsg->{tid};
        my $n = $smsg->{num};
        die "num must not be zero for $mid" if !$n;
-       $$old_tid = $cur_tid unless defined $$old_tid;
+       my $cur_valid = $cur_tid > $self->{min_tid};
+
        if ($n > 0) { # regular mail
-               merge_threads($self, $$old_tid, $cur_tid);
+               if ($cur_valid) {
+                       $$old_tid //= $cur_tid;
+                       merge_threads($self, $$old_tid, $cur_tid);
+               } else {
+                       $$old_tid //= next_tid($self);
+               }
        } elsif ($n < 0) { # ghost
+               $$old_tid //= $cur_valid ? $cur_tid : next_tid($self);
                link_refs($self, $refs, $$old_tid);
                delete_by_num($self, $n);
                $$v++;
@@ -297,6 +319,7 @@ sub add_over {
 
        begin_lazy($self);
        delete_by_num($self, $num, \$old_tid);
+       $old_tid = undef if ($old_tid // 0) <= $self->{min_tid};
        foreach my $mid (@$mids) {
                my $v = 0;
                each_by_mid($self, $mid, ['tid'], \&_add_over,
@@ -456,4 +479,47 @@ sub create {
        $self->disconnect;
 }
 
+sub rethread_prepare {
+       my ($self, $opt) = @_;
+       return unless $opt->{rethread};
+       begin_lazy($self);
+       my $min = $self->{min_tid} = get_counter($self->{dbh}, 'thread') // 0;
+       my $pr = $opt->{-progress};
+       $pr->("rethread min THREADID ".($min + 1)."\n") if $pr && $min;
+}
+
+sub rethread_done {
+       my ($self, $opt) = @_;
+       return unless $opt->{rethread} && $self->{txn};
+       defined(my $min = $self->{min_tid}) or croak('BUG: no min_tid');
+       my $dbh = $self->{dbh} or croak('BUG: no dbh');
+       my $rows = $dbh->selectall_arrayref(<<'', { Slice => {} }, $min);
+SELECT num,tid FROM over WHERE num < 0 AND tid < ?
+
+       my $show_id = $dbh->prepare('SELECT id FROM id2num WHERE num = ?');
+       my $show_mid = $dbh->prepare('SELECT mid FROM msgid WHERE id = ?');
+       my $pr = $opt->{-progress};
+       my $total = 0;
+       for my $r (@$rows) {
+               my $exp = 0;
+               $show_id->execute($r->{num});
+               while (defined(my $id = $show_id->fetchrow_array)) {
+                       ++$exp;
+                       $show_mid->execute($id);
+                       my $mid = $show_mid->fetchrow_array;
+                       if (!defined($mid)) {
+                               warn <<EOF;
+E: ghost NUM=$r->{num} ID=$id THREADID=$r->{tid} has no Message-ID
+EOF
+                               next;
+                       }
+                       $pr->(<<EOM) if $pr;
+I: ghost $r->{num} <$mid> THREADID=$r->{tid} culled
+EOM
+               }
+               delete_by_num($self, $r->{num});
+       }
+       $pr->("I: rethread culled $total ghosts\n") if $pr && $total;
+}
+
 1;
index 8316250906994e6649d1a96cf80edb0197e18aa8..e641ffd43776839e278559b89f0d40b22a73d5be 100644 (file)
@@ -723,6 +723,7 @@ sub _index_sync {
        my $pr = $opts->{-progress};
 
        my $xdb = $self->begin_txn_lazy;
+       $self->{over}->rethread_prepare($opts);
        my $mm = _msgmap_init($self);
        do {
                $xlog = undef; # stop previous git-log via SIGPIPE
@@ -761,12 +762,14 @@ sub _index_sync {
                                $xdb->set_metadata('last_commit', $newest);
                        }
                }
+
+               $self->{over}->rethread_done($opts) if $newest; # all done
                $self->commit_txn_lazy;
                $git->cleanup;
                $xdb = _xdb_release($self, $nr);
-               # let another process do some work... <
+               # let another process do some work...
                $pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
-               if (!$newest) {
+               if (!$newest) { # more to come
                        $xdb = $self->begin_txn_lazy;
                        $dbh->begin_work if $dbh;
                }
index 0582dd5e32e8e87484c64d01447253af221e4a0c..16556ddc209d7ab4d3cdcfc2d3d5fd91c4aa052e 100644 (file)
@@ -1308,6 +1308,7 @@ sub index_sync {
        my $latest = git_dir_latest($self, \$epoch_max);
        return unless defined $latest;
        $self->idx_init($opt); # acquire lock
+       $self->{over}->rethread_prepare($opt);
        my $sync = {
                D => {}, # "$mid\0$chash" => $oid
                unindex_range => {}, # EPOCH => oid_old..oid_new
@@ -1370,12 +1371,13 @@ sub index_sync {
                my $pr = $sync->{-opt}->{-progress};
                $pr->('all.git '.sprintf($sync->{-regen_fmt}, $nr)) if $pr;
        }
+       $self->{over}->rethread_done($opt);
 
        # reindex does not pick up new changes, so we rerun w/o it:
        if ($opt->{reindex}) {
                my %again = %$opt;
                $sync = undef;
-               delete @again{qw(reindex -skip_lock)};
+               delete @again{qw(rethread reindex -skip_lock)};
                index_sync($self, \%again);
        }
 }
index 6217fb86c4ee3cef46083e78a5d4d3440e4bad63..2e1934b0833110d58ca0b2ccaba47cd6249419ff 100755 (executable)
@@ -15,7 +15,7 @@ use PublicInbox::Xapcmd;
 
 my $compact_opt;
 my $opt = { quiet => -1, compact => 0, maxsize => undef };
-GetOptions($opt, qw(verbose|v+ reindex compact|c+ jobs|j=i prune
+GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i prune
                indexlevel|L=s maxsize|max-size=s batchsize|batch-size=s))
        or die "bad command-line args\n$usage";
 die "--jobs must be >= 0\n" if defined $opt->{jobs} && $opt->{jobs} < 0;
index 9f23ef01e5646a7cfd9398dcd0ceb389c2b999d5..8cb751881ca4e93d91f26811d4f1cac74aecf4cb 100644 (file)
@@ -11,6 +11,7 @@ require_git(2.6);
 require_mods(qw(DBD::SQLite Search::Xapian));
 use_ok 'PublicInbox::SearchIdx';
 use_ok 'PublicInbox::Import';
+use_ok 'PublicInbox::OverIdx';
 my ($inboxdir, $for_destroy) = tmpdir();
 my $ibx_config = {
        inboxdir => $inboxdir,
@@ -427,5 +428,38 @@ ok(!-d $xap, 'Xapian directories removed again');
                  ], 'msgmap as expected' );
 }
 
+{
+       my @warn;
+       local $SIG{__WARN__} = sub { push @warn, @_ };
+       my $ibx = PublicInbox::Inbox->new({ %$ibx_config });
+       my $f = $ibx->over->{dbh}->sqlite_db_filename;
+       my $over = PublicInbox::OverIdx->new($f);
+       my $dbh = $over->connect;
+       my $non_ghost_tids = sub {
+               $dbh->selectall_arrayref(<<'');
+SELECT tid FROM over WHERE num > 0 ORDER BY tid ASC
+
+       };
+       my $before = $non_ghost_tids->();
+
+       # mess up threading:
+       my $tid = PublicInbox::OverIdx::get_counter($dbh, 'thread');
+       my $nr = $dbh->do('UPDATE over SET tid = ?', undef, $tid);
+
+       my $rw = PublicInbox::SearchIdx->new($ibx, 1);
+       my @pr;
+       my $pr = sub { push @pr, @_ };
+       $rw->index_sync({reindex => 1, rethread => 1, -progress => $pr });
+       my @n = $dbh->selectrow_array(<<EOS, undef, $tid);
+SELECT COUNT(*) FROM over WHERE tid <= ?
+EOS
+       is_deeply(\@n, [ 0 ], 'rethread dropped old threadids');
+       my $after = $non_ghost_tids->();
+       ok($after->[0]->[0] > $before->[-1]->[0],
+               'all tids greater than before');
+       is(scalar @$after, scalar @$before, 'thread count unchanged');
+       is_deeply([], \@warn, 'no warnings');
+       # diag "@pr"; # XXX do we care?
+}
 
 done_testing();
index 77deffb4ba9d780bc837a69d01028bea96f02f21..ea2b24e59bf894335bf3e98cf7c1309e57c9d85c 100644 (file)
@@ -10,6 +10,7 @@ use PublicInbox::TestCommon;
 require_git(2.6);
 require_mods(qw(DBD::SQLite Search::Xapian));
 use_ok 'PublicInbox::V2Writable';
+use_ok 'PublicInbox::OverIdx';
 my ($inboxdir, $for_destroy) = tmpdir();
 my $ibx_config = {
        inboxdir => $inboxdir,
@@ -423,6 +424,46 @@ ok(!-d $xap, 'Xapian directories removed again');
                  ], 'msgmap as expected' );
 }
 
+my $check_rethread = sub {
+       my ($desc) = @_;
+       my @warn;
+       local $SIG{__WARN__} = sub { push @warn, @_ };
+       my %config = %$ibx_config;
+       my $ibx = PublicInbox::Inbox->new(\%config);
+       my $f = $ibx->over->{dbh}->sqlite_db_filename;
+       my $over = PublicInbox::OverIdx->new($f);
+       my $dbh = $over->connect;
+       my $non_ghost_tids = sub {
+               $dbh->selectall_arrayref(<<'');
+SELECT tid FROM over WHERE num > 0 ORDER BY tid ASC
+
+       };
+       my $before = $non_ghost_tids->();
+
+       # mess up threading:
+       my $tid = PublicInbox::OverIdx::get_counter($dbh, 'thread');
+       my $nr = $dbh->do('UPDATE over SET tid = ?', undef, $tid);
+       diag "messing up all threads with tid=$tid";
+
+       my $v2w = PublicInbox::V2Writable->new($ibx);
+       my @pr;
+       my $pr = sub { push @pr, @_ };
+       $v2w->index_sync({reindex => 1, rethread => 1, -progress => $pr});
+       # diag "@pr"; # nobody cares
+       is_deeply(\@warn, [], 'no warnings on reindex + rethread');
+
+       my @n = $dbh->selectrow_array(<<EOS, undef, $tid);
+SELECT COUNT(*) FROM over WHERE tid <= ?
+EOS
+       is_deeply(\@n, [ 0 ], 'rethread dropped old threadids');
+       my $after = $non_ghost_tids->();
+       ok($after->[0]->[0] > $before->[-1]->[0],
+               'all tids greater than before');
+       is(scalar @$after, scalar @$before, 'thread count unchanged');
+};
+
+$check_rethread->('no-monster');
+
 # A real example from linux-renesas-soc on lore where a 3-headed monster
 # of a message has 3 sets of common headers.  Another normal message
 # previously existed with a single Message-ID that conflicts with one
@@ -497,4 +538,8 @@ EOF
        is_deeply([values %uniq], [3], 'search on different subjects');
 }
 
+# XXX: not deterministic when dealing with ambiguous messages, oh well
+$check_rethread->('3-headed-monster once');
+$check_rethread->('3-headed-monster twice');
+
 done_testing();