topdir => $dir,
creat => $opt->{creat},
ibx_map => {}, # (newsgroup//inboxdir) => $ibx
- ibx_list => [],
+ ibx_cfg => [], # by config section order
indexlevel => $l,
transact_bytes => 0,
total_bytes => 0,
sub attach_inbox {
my ($self, $ibx) = @_;
$self->{ibx_map}->{$ibx->eidx_key} //= do {
- push @{$self->{ibx_list}}, $ibx;
+ delete $self->{-ibx_ary}; # invalidate cache
+ push @{$self->{ibx_cfg}}, $ibx;
$ibx;
}
}
my ($self, $sync, $smsg) = @_;
my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
- $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+ $self->{-ibx_ary}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
}
sub _fd_constrained ($) {
chomp($soft = `sh -c 'ulimit -n'`);
}
if (defined($soft)) {
- my $want = scalar(@{$self->{ibx_list}}) + 64; # estimate
+ my $want = scalar(@{$self->{-ibx_ary}}) + 64; # estimate
my $ret = $want > $soft;
if ($ret) {
warn <<EOF;
return;
}
- # we sort {xr3r} in the reverse order of {ibx_list} so we can
+ # we sort {xr3r} in the reverse order of ibx_sorted so we can
# hit the common case in _reindex_finalize without rereading
# from git (or holding multiple messages in memory).
- my $id2pos = $sync->{id2pos}; # index in {ibx_list}
+ my $id2pos = $sync->{id2pos}; # index in ibx_sorted
@$xr3 = sort {
$id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
||
undef;
}
+sub ibx_sorted ($) {
+ my ($self) = @_;
+ $self->{-ibx_ary} //= do {
+ # highest boost first, stable for config-ordering tiebreaker
+ use sort 'stable';
+ [ sort {
+ ($b->{boost} // 0) <=> ($a->{boost} // 0)
+ } @{$self->{ibx_cfg}} ];
+ }
+}
+
sub eidxq_process ($$) { # for reindexing
my ($self, $sync) = @_;
$sync->{id2pos} //= do {
my %id2pos;
my $pos = 0;
- $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+ $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self)});
\%id2pos;
};
my ($del, $iter);
warn "E: aborting --reindex\n";
return;
}
- for my $ibx (@{$self->{ibx_list}}) {
+ for my $ibx (@{ibx_sorted($self)}) {
_reindex_inbox($self, $sync, $ibx);
last if $sync->{quit};
}
}
}
-sub eidx_dedupe ($$) {
- my ($self, $sync) = @_;
+sub eidx_dedupe ($$$) {
+ my ($self, $sync, $msgids) = @_;
$sync->{dedupe_cull} = 0;
my $candidates = 0;
my $nr_mid = 0;
return unless eidxq_lock_acquire($self);
- my $iter;
+ my ($iter, $cur_mid);
my $min_id = 0;
+ my $idx = 0;
local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n";
+
+ # note: we could write this query more intelligently,
+ # but that causes lock contention with read-only processes
dedupe_restart:
- $iter = $self->{oidx}->dbh->prepare(<<EOS);
-SELECT DISTINCT(mid),id FROM msgid WHERE id IN
-(SELECT id FROM id2num WHERE id > ? GROUP BY num HAVING COUNT(num) > 1)
-ORDER BY id
+ $cur_mid = $msgids->[$idx];
+ if ($cur_mid eq '') { # all Message-IDs
+ $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT mid,id FROM msgid WHERE id > ? ORDER BY id ASC
EOS
- $iter->execute($min_id);
- local $SIG{__WARN__} = sub {
- return if PublicInbox::Eml::warn_ignore(@_);
- warn @_;
- };
+ $iter->execute($min_id);
+ } else {
+ $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT mid,id FROM msgid WHERE mid = ? AND id > ? ORDER BY id ASC
+EOS
+ $iter->execute($cur_mid, $min_id);
+ }
while (my ($mid, $id) = $iter->fetchrow_array) {
last if $sync->{quit};
$self->{current_info} = "dedupe $mid";
${$sync->{nr}} = $min_id = $id;
- my ($n, $prv, @smsg);
- while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) {
+ my ($prv, @smsg);
+ while (my $x = $self->{oidx}->next_by_mid($mid, \$id, \$prv)) {
push @smsg, $x;
}
next if scalar(@smsg) < 2;
# need to wait on every single one
$self->git->async_wait_all;
- # is checkpoint needed? $iter is a very expensive query to restart
- if (0 && checkpoint_due($sync)) {
+ if (checkpoint_due($sync)) {
undef $iter;
reindex_checkpoint($self, $sync);
goto dedupe_restart;
}
}
+ goto dedupe_restart if defined($msgids->[++$idx]);
+
my $n = delete $sync->{dedupe_cull};
if (my $pr = $sync->{-opt}->{-progress}) {
$pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
local $self->{current_info} = '';
local $SIG{__WARN__} = sub {
+ return if PublicInbox::Eml::warn_ignore(@_);
$warn_cb->($self->{current_info}, ': ', @_);
};
$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
- for my $ibx (@{$self->{ibx_list}}) {
+ for my $ibx (@{ibx_sorted($self)}) {
$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
}
- if (delete($opt->{dedupe})) {
+ if (my $msgids = delete($opt->{dedupe})) {
local $sync->{checkpoint_unlocks} = 1;
- eidx_dedupe($self, $sync);
+ eidx_dedupe($self, $sync, $msgids);
}
if (delete($opt->{reindex})) {
local $sync->{checkpoint_unlocks} = 1;
# don't use $_ here, it'll get clobbered by reindex_checkpoint
if ($opt->{scan} // 1) {
- for my $ibx (@{$self->{ibx_list}}) {
+ for my $ibx (@{ibx_sorted($self)}) {
last if $sync->{quit};
sync_inbox($self, $sync, $ibx);
}
}
undef $dh;
}
- for my $ibx (@{$self->{ibx_list}}) {
+ for my $ibx (@{ibx_sorted($self)}) {
# create symlinks for multi-pack-index
$git_midx += symlink_packs($ibx, $pd);
# add new lines to our alternates file
my $pr = $self->{-watch_sync}->{-opt}->{-progress};
$pr->('reloading ...') if $pr;
delete $self->{-resync_queue};
- @{$self->{ibx_list}} = ();
+ delete $self->{-ibx_ary};
+ $self->{ibx_cfg} = [];
%{$self->{ibx_map}} = ();
delete $self->{-watch_sync}->{id2pos};
my $cfg = PublicInbox::Config->new;
sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
my ($self) = @_;
- $self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
+ $self->{-resync_queue} //= [ @{ibx_sorted($self)} ];
PublicInbox::DS::requeue($self); # trigger our ->event_step
}
require PublicInbox::Sigfd;
my $idler = PublicInbox::InboxIdle->new($self->{cfg});
if (!$self->{cfg}) {
- $idler->watch_inbox($_) for @{$self->{ibx_list}};
+ $idler->watch_inbox($_) for (@{ibx_sorted($self)});
}
- $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
+ $_->subscribe_unlock(__PACKAGE__, $self) for (@{ibx_sorted($self)});
my $pr = $opt->{-progress};
$pr->("performing initial scan ...\n") if $pr;
my $sync = eidx_sync($self, $opt); # initial sync
my $oldset = PublicInbox::DS::block_signals();
local $self->{current_info} = '';
my $cb = $SIG{__WARN__} || \&CORE::warn;
- local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+ local $SIG{__WARN__} = sub {
+ return if PublicInbox::Eml::warn_ignore(@_);
+ $cb->($self->{current_info}, ': ', @_);
+ };
my $sig = {
HUP => sub { eidx_reload($self, $idler) },
USR1 => sub { eidx_resync_start($self) },