topdir => $dir,
creat => $opt->{creat},
ibx_map => {}, # (newsgroup//inboxdir) => $ibx
- ibx_cfg => [], # by config section order
+ ibx_active => [], # by config section order
+ ibx_known => [], # by config section order
indexlevel => $l,
transact_bytes => 0,
total_bytes => 0,
}
sub attach_inbox {
- my ($self, $ibx) = @_;
+ my ($self, $ibx, $types) = @_;
$self->{ibx_map}->{$ibx->eidx_key} //= do {
- delete $self->{-ibx_ary}; # invalidate cache
- push @{$self->{ibx_cfg}}, $ibx;
+ delete $self->{-ibx_ary_known}; # invalidate cache
+ delete $self->{-ibx_ary_active}; # invalidate cache
+ $types //= [ qw(active known) ];
+ for my $t (@$types) {
+ push @{$self->{"ibx_$t"}}, $ibx;
+ }
$ibx;
}
}
sub _ibx_attach { # each_inbox callback
- my ($ibx, $self) = @_;
- attach_inbox($self, $ibx);
+ my ($ibx, $self, $types) = @_;
+ attach_inbox($self, $ibx, $types);
}
sub attach_config {
- my ($self, $cfg) = @_;
+ my ($self, $cfg, $ibxs) = @_;
$self->{cfg} = $cfg;
- $cfg->each_inbox(\&_ibx_attach, $self);
+ my $types;
+ if ($ibxs) {
+ for my $ibx (@$ibxs) {
+ $self->{ibx_map}->{$ibx->eidx_key} //= do {
+ push @{$self->{ibx_active}}, $ibx;
+ push @{$self->{ibx_known}}, $ibx;
+ }
+ }
+ # invalidate cache
+ delete $self->{-ibx_ary_known};
+ delete $self->{-ibx_ary_active};
+ $types = [ 'known' ];
+ }
+ $types //= [ qw(known active) ];
+ $cfg->each_inbox(\&_ibx_attach, $self, $types);
}
sub check_batch_limit ($) {
${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes};
}
+sub apply_boost ($$) {
+ my ($req, $smsg) = @_;
+ my $id2pos = $req->{id2pos}; # index in ibx_sorted
+ my $xr3 = $req->{self}->{oidx}->get_xref3($smsg->{num}, 1);
+ @$xr3 = sort {
+ $id2pos->{$a->[0]} <=> $id2pos->{$b->[0]}
+ ||
+ $a->[1] <=> $b->[1] # break ties with {xnum}
+ } @$xr3;
+ my $top_blob = unpack('H*', $xr3->[0]->[2]);
+ my $new_smsg = $req->{new_smsg};
+ return if $top_blob ne $new_smsg->{blob}; # loser
+
+ # replace the old smsg with the more boosted one
+ $new_smsg->{num} = $smsg->{num};
+ $new_smsg->populate($req->{eml}, $req);
+ $req->{self}->{oidx}->add_overview($req->{eml}, $new_smsg);
+}
+
sub do_xpost ($$) {
my ($req, $smsg) = @_;
my $self = $req->{self};
my $xnum = $req->{xnum};
$self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key);
$idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
+ apply_boost($req, $smsg) if $req->{boost_in_use};
} else { # 'd'
my $rm_eidx_info;
my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key,
my ($self, $sync, $smsg) = @_;
my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
- $self->{-ibx_ary}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+ $self->{-ibx_ary_known}->[$pos] //
+ die "BUG: ibx for $smsg->{blob} not mapped"
}
sub _fd_constrained ($) {
chomp($soft = `sh -c 'ulimit -n'`);
}
if (defined($soft)) {
- my $want = scalar(@{$self->{-ibx_ary}}) + 64; # estimate
+ # $want is an estimate
+ my $want = scalar(@{$self->{ibx_active}}) + 64;
my $ret = $want > $soft;
if ($ret) {
warn <<EOF;
undef;
}
-sub ibx_sorted ($) {
- my ($self) = @_;
- $self->{-ibx_ary} //= do {
+sub ibx_sorted ($$) {
+ my ($self, $type) = @_;
+ $self->{"-ibx_ary_$type"} //= do {
# highest boost first, stable for config-ordering tiebreaker
use sort 'stable';
[ sort {
($b->{boost} // 0) <=> ($a->{boost} // 0)
- } @{$self->{ibx_cfg}} ];
+ } @{$self->{'ibx_'.$type} // die "BUG: $type unknown"} ];
}
}
+sub prep_id2pos ($) {
+ my ($self) = @_;
+ my %id2pos;
+ my $pos = 0;
+ $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self, 'known')});
+ \%id2pos;
+}
+
sub eidxq_process ($$) { # for reindexing
my ($self, $sync) = @_;
my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
$pr->("Xapian indexing $min..$max (total=$tot)\n");
}
- $sync->{id2pos} //= do {
- my %id2pos;
- my $pos = 0;
- $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self)});
- \%id2pos;
- };
+ $sync->{id2pos} //= prep_id2pos($self);
my ($del, $iter);
restart:
$del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
warn "E: aborting --reindex\n";
return;
}
- for my $ibx (@{ibx_sorted($self)}) {
+ for my $ibx (@{ibx_sorted($self, 'active')}) {
_reindex_inbox($self, $sync, $ibx);
last if $sync->{quit};
}
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
- for my $ibx (@{ibx_sorted($self)}) {
+ for my $ibx (@{ibx_sorted($self, 'known')}) {
$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
}
+
+ if (scalar(grep { defined($_->{boost}) } @{$self->{ibx_known}})) {
+ $sync->{id2pos} //= prep_id2pos($self);
+ $sync->{boost_in_use} = 1;
+ }
+
if (my $msgids = delete($opt->{dedupe})) {
local $sync->{checkpoint_unlocks} = 1;
eidx_dedupe($self, $sync, $msgids);
# don't use $_ here, it'll get clobbered by reindex_checkpoint
if ($opt->{scan} // 1) {
- for my $ibx (@{ibx_sorted($self)}) {
+ for my $ibx (@{ibx_sorted($self, 'active')}) {
last if $sync->{quit};
sync_inbox($self, $sync, $ibx);
}
}
undef $dh;
}
- for my $ibx (@{ibx_sorted($self)}) {
+ for my $ibx (@{ibx_sorted($self, 'active')}) {
# create symlinks for multi-pack-index
$git_midx += symlink_packs($ibx, $pd);
# add new lines to our alternates file
my $pr = $self->{-watch_sync}->{-opt}->{-progress};
$pr->('reloading ...') if $pr;
delete $self->{-resync_queue};
- delete $self->{-ibx_ary};
- $self->{ibx_cfg} = [];
+ delete $self->{-ibx_ary_known};
+ delete $self->{-ibx_ary_active};
+ $self->{ibx_known} = [];
+ $self->{ibx_active} = [];
%{$self->{ibx_map}} = ();
delete $self->{-watch_sync}->{id2pos};
my $cfg = PublicInbox::Config->new;
sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
my ($self) = @_;
- $self->{-resync_queue} //= [ @{ibx_sorted($self)} ];
+ $self->{-resync_queue} //= [ @{ibx_sorted($self, 'active')} ];
PublicInbox::DS::requeue($self); # trigger our ->event_step
}
require PublicInbox::Sigfd;
my $idler = PublicInbox::InboxIdle->new($self->{cfg});
if (!$self->{cfg}) {
- $idler->watch_inbox($_) for (@{ibx_sorted($self)});
+ $idler->watch_inbox($_) for (@{ibx_sorted($self, 'active')});
+ }
+ for my $ibx (@{ibx_sorted($self, 'active')}) {
+ $ibx->subscribe_unlock(__PACKAGE__, $self)
}
- $_->subscribe_unlock(__PACKAGE__, $self) for (@{ibx_sorted($self)});
my $pr = $opt->{-progress};
$pr->("performing initial scan ...\n") if $pr;
my $sync = eidx_sync($self, $opt); # initial sync