L<public-inbox-compact(1)> afterwards is recommended to
release free space.
-public-inbox protects writes to various indices with L<flock(2)>,
-so it is safe to reindex while L<public-inbox-watch(1)>,
-L<public-inbox-mda(1)> or L<public-inbox-learn(1)> run.
+public-inbox protects writes to various indices with
+L<flock(2)>, so it is safe to reindex (and rethread) while
+L<public-inbox-watch(1)>, L<public-inbox-mda(1)> or
+L<public-inbox-learn(1)> run.
-This does not touch the NNTP article number database or
-affect threading.
+This does not touch the NNTP article number database.
+It does not affect threading unless C<--rethread> is
+used.
+
+=item --rethread
+
+Regenerate internal THREADID and message thread associations
+when reindexing.
+
+This fixes some bugs in older versions of public-inbox. While
+it is possible to use this without C<--reindex>, it makes little
+sense to do so.
+
+Available in public-inbox 1.6.0 (PENDING).
=item --prune
use PublicInbox::Smsg qw(subject_normalized);
use Compress::Zlib qw(compress);
use PublicInbox::Search;
+use Carp qw(croak);
sub dbh_new {
my ($self) = @_;
$dbh;
}
+sub new {
+ my ($class, $f) = @_;
+ my $self = $class->SUPER::new($f);
+ $self->{min_tid} = 0;
+ $self;
+}
+
sub get_counter ($$) {
my ($dbh, $key) = @_;
my $sth = $dbh->prepare_cached(<<'', undef, 1);
my $cur_tid = $smsg->{tid};
if (defined $$tid) {
merge_threads($self, $$tid, $cur_tid);
- } else {
+ } elsif ($cur_tid > $self->{min_tid}) {
$$tid = $cur_tid;
+ } else { # rethreading, queue up dead ghosts
+ $$tid = next_tid($self);
+ my $num = $smsg->{num};
+ push(@{$self->{-ghosts_to_delete}}, $num) if $num < 0;
}
1;
}
my ($self, $mid) = @_;
my $tid;
each_by_mid($self, $mid, ['tid'], \&_resolve_mid_to_tid, \$tid);
- defined $tid ? $tid : create_ghost($self, $mid);
+ if (my $del = delete $self->{-ghosts_to_delete}) {
+ delete_by_num($self, $_) for @$del;
+ }
+ $tid // create_ghost($self, $mid);
}
sub create_ghost {
merge_threads($self, $tid, $ptid);
}
} else {
- $tid = defined $old_tid ? $old_tid : next_tid($self);
+ $tid = $old_tid // next_tid($self);
}
$tid;
}
my $cur_tid = $smsg->{tid};
my $n = $smsg->{num};
die "num must not be zero for $mid" if !$n;
- $$old_tid = $cur_tid unless defined $$old_tid;
+ my $cur_valid = $cur_tid > $self->{min_tid};
+
if ($n > 0) { # regular mail
- merge_threads($self, $$old_tid, $cur_tid);
+ if ($cur_valid) {
+ $$old_tid //= $cur_tid;
+ merge_threads($self, $$old_tid, $cur_tid);
+ } else {
+ $$old_tid //= next_tid($self);
+ }
} elsif ($n < 0) { # ghost
+ $$old_tid //= $cur_valid ? $cur_tid : next_tid($self);
link_refs($self, $refs, $$old_tid);
delete_by_num($self, $n);
$$v++;
begin_lazy($self);
delete_by_num($self, $num, \$old_tid);
+ $old_tid = undef if ($old_tid // 0) <= $self->{min_tid};
foreach my $mid (@$mids) {
my $v = 0;
each_by_mid($self, $mid, ['tid'], \&_add_over,
$self->disconnect;
}
+sub rethread_prepare {
+ my ($self, $opt) = @_;
+ return unless $opt->{rethread};
+ begin_lazy($self);
+ my $min = $self->{min_tid} = get_counter($self->{dbh}, 'thread') // 0;
+ my $pr = $opt->{-progress};
+ $pr->("rethread min THREADID ".($min + 1)."\n") if $pr && $min;
+}
+
+sub rethread_done {
+ my ($self, $opt) = @_;
+ return unless $opt->{rethread} && $self->{txn};
+ defined(my $min = $self->{min_tid}) or croak('BUG: no min_tid');
+ my $dbh = $self->{dbh} or croak('BUG: no dbh');
+ my $rows = $dbh->selectall_arrayref(<<'', { Slice => {} }, $min);
+SELECT num,tid FROM over WHERE num < 0 AND tid < ?
+
+ my $show_id = $dbh->prepare('SELECT id FROM id2num WHERE num = ?');
+ my $show_mid = $dbh->prepare('SELECT mid FROM msgid WHERE id = ?');
+ my $pr = $opt->{-progress};
+ my $total = 0;
+ for my $r (@$rows) {
+ my $exp = 0;
+ $show_id->execute($r->{num});
+ while (defined(my $id = $show_id->fetchrow_array)) {
+ ++$exp;
+ $show_mid->execute($id);
+ my $mid = $show_mid->fetchrow_array;
+ if (!defined($mid)) {
+ warn <<EOF;
+E: ghost NUM=$r->{num} ID=$id THREADID=$r->{tid} has no Message-ID
+EOF
+ next;
+ }
+ $pr->(<<EOM) if $pr;
+I: ghost $r->{num} <$mid> THREADID=$r->{tid} culled
+EOM
+ }
+ delete_by_num($self, $r->{num});
+ }
+ $pr->("I: rethread culled $total ghosts\n") if $pr && $total;
+}
+
1;
my $pr = $opts->{-progress};
my $xdb = $self->begin_txn_lazy;
+ $self->{over}->rethread_prepare($opts);
my $mm = _msgmap_init($self);
do {
$xlog = undef; # stop previous git-log via SIGPIPE
$xdb->set_metadata('last_commit', $newest);
}
}
+
+ $self->{over}->rethread_done($opts) if $newest; # all done
$self->commit_txn_lazy;
$git->cleanup;
$xdb = _xdb_release($self, $nr);
- # let another process do some work... <
+ # let another process do some work...
$pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
- if (!$newest) {
+ if (!$newest) { # more to come
$xdb = $self->begin_txn_lazy;
$dbh->begin_work if $dbh;
}
my $latest = git_dir_latest($self, \$epoch_max);
return unless defined $latest;
$self->idx_init($opt); # acquire lock
+ $self->{over}->rethread_prepare($opt);
my $sync = {
D => {}, # "$mid\0$chash" => $oid
unindex_range => {}, # EPOCH => oid_old..oid_new
my $pr = $sync->{-opt}->{-progress};
$pr->('all.git '.sprintf($sync->{-regen_fmt}, $nr)) if $pr;
}
+ $self->{over}->rethread_done($opt);
# reindex does not pick up new changes, so we rerun w/o it:
if ($opt->{reindex}) {
my %again = %$opt;
$sync = undef;
- delete @again{qw(reindex -skip_lock)};
+ delete @again{qw(rethread reindex -skip_lock)};
index_sync($self, \%again);
}
}
my $compact_opt;
my $opt = { quiet => -1, compact => 0, maxsize => undef };
-GetOptions($opt, qw(verbose|v+ reindex compact|c+ jobs|j=i prune
+GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i prune
indexlevel|L=s maxsize|max-size=s batchsize|batch-size=s))
or die "bad command-line args\n$usage";
die "--jobs must be >= 0\n" if defined $opt->{jobs} && $opt->{jobs} < 0;
require_mods(qw(DBD::SQLite Search::Xapian));
use_ok 'PublicInbox::SearchIdx';
use_ok 'PublicInbox::Import';
+use_ok 'PublicInbox::OverIdx';
my ($inboxdir, $for_destroy) = tmpdir();
my $ibx_config = {
inboxdir => $inboxdir,
], 'msgmap as expected' );
}
+{
+ my @warn;
+ local $SIG{__WARN__} = sub { push @warn, @_ };
+ my $ibx = PublicInbox::Inbox->new({ %$ibx_config });
+ my $f = $ibx->over->{dbh}->sqlite_db_filename;
+ my $over = PublicInbox::OverIdx->new($f);
+ my $dbh = $over->connect;
+ my $non_ghost_tids = sub {
+ $dbh->selectall_arrayref(<<'');
+SELECT tid FROM over WHERE num > 0 ORDER BY tid ASC
+
+ };
+ my $before = $non_ghost_tids->();
+
+ # mess up threading:
+ my $tid = PublicInbox::OverIdx::get_counter($dbh, 'thread');
+ my $nr = $dbh->do('UPDATE over SET tid = ?', undef, $tid);
+
+ my $rw = PublicInbox::SearchIdx->new($ibx, 1);
+ my @pr;
+ my $pr = sub { push @pr, @_ };
+ $rw->index_sync({reindex => 1, rethread => 1, -progress => $pr });
+ my @n = $dbh->selectrow_array(<<EOS, undef, $tid);
+SELECT COUNT(*) FROM over WHERE tid <= ?
+EOS
+ is_deeply(\@n, [ 0 ], 'rethread dropped old threadids');
+ my $after = $non_ghost_tids->();
+ ok($after->[0]->[0] > $before->[-1]->[0],
+ 'all tids greater than before');
+ is(scalar @$after, scalar @$before, 'thread count unchanged');
+ is_deeply([], \@warn, 'no warnings');
+ # diag "@pr"; # XXX do we care?
+}
done_testing();
require_git(2.6);
require_mods(qw(DBD::SQLite Search::Xapian));
use_ok 'PublicInbox::V2Writable';
+use_ok 'PublicInbox::OverIdx';
my ($inboxdir, $for_destroy) = tmpdir();
my $ibx_config = {
inboxdir => $inboxdir,
], 'msgmap as expected' );
}
+my $check_rethread = sub {
+ my ($desc) = @_;
+ my @warn;
+ local $SIG{__WARN__} = sub { push @warn, @_ };
+ my %config = %$ibx_config;
+ my $ibx = PublicInbox::Inbox->new(\%config);
+ my $f = $ibx->over->{dbh}->sqlite_db_filename;
+ my $over = PublicInbox::OverIdx->new($f);
+ my $dbh = $over->connect;
+ my $non_ghost_tids = sub {
+ $dbh->selectall_arrayref(<<'');
+SELECT tid FROM over WHERE num > 0 ORDER BY tid ASC
+
+ };
+ my $before = $non_ghost_tids->();
+
+ # mess up threading:
+ my $tid = PublicInbox::OverIdx::get_counter($dbh, 'thread');
+ my $nr = $dbh->do('UPDATE over SET tid = ?', undef, $tid);
+ diag "messing up all threads with tid=$tid";
+
+ my $v2w = PublicInbox::V2Writable->new($ibx);
+ my @pr;
+ my $pr = sub { push @pr, @_ };
+ $v2w->index_sync({reindex => 1, rethread => 1, -progress => $pr});
+ # diag "@pr"; # nobody cares
+ is_deeply(\@warn, [], 'no warnings on reindex + rethread');
+
+ my @n = $dbh->selectrow_array(<<EOS, undef, $tid);
+SELECT COUNT(*) FROM over WHERE tid <= ?
+EOS
+ is_deeply(\@n, [ 0 ], 'rethread dropped old threadids');
+ my $after = $non_ghost_tids->();
+ ok($after->[0]->[0] > $before->[-1]->[0],
+ 'all tids greater than before');
+ is(scalar @$after, scalar @$before, 'thread count unchanged');
+};
+
+$check_rethread->('no-monster');
+
# A real example from linux-renesas-soc on lore where a 3-headed monster
# of a message has 3 sets of common headers. Another normal message
# previously existed with a single Message-ID that conflicts with one
is_deeply([values %uniq], [3], 'search on different subjects');
}
+# XXX: not deterministic when dealing with ambiguous messages, oh well
+$check_rethread->('3-headed-monster once');
+$check_rethread->('3-headed-monster twice');
+
done_testing();