-# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# Detached/external index cross inbox search indexing support
use Sys::Hostname qw(hostname);
use POSIX qw(strftime);
use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor
- is_bad_blob);
+use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
use PublicInbox::OverIdx;
use PublicInbox::MiscIdx;
use PublicInbox::MID qw(mids);
my ($req) = @_;
my $self = $req->{self};
my $new_smsg = $req->{new_smsg};
-
- # {raw_bytes} may be unset, so just use {bytes}
my $n = $self->{transact_bytes} += $new_smsg->{bytes};
# set flag for PublicInbox::V2Writable::index_todo:
if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message
my $xnum = $req->{xnum};
$self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key);
- $idx->shard_add_eidx_info($docid, $eidx_key, $eml);
+ $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
check_batch_limit($req);
} else { # 'd'
my $rm_eidx_info;
\$rm_eidx_info);
if ($nr == 0) {
$self->{oidx}->eidxq_del($docid);
- $idx->shard_remove($docid);
+ $idx->ipc_do('xdb_remove', $docid);
} elsif ($rm_eidx_info) {
- $idx->shard_remove_eidx_info($docid, $eidx_key, $eml);
+ $idx->ipc_do('remove_eidx_info',
+ $docid, $eidx_key, $eml);
$self->{oidx}->eidxq_add($docid); # yes, add
}
}
my $oid = $new_smsg->{blob};
my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
- $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
+ $idx->index_eml($eml, $new_smsg, $ibx->eidx_key);
check_batch_limit($req);
}
my $new_smsg = $req->{new_smsg} = bless {
blob => $oid,
}, 'PublicInbox::Smsg';
- $new_smsg->{bytes} = $size + crlf_adjust($$bref);
+ $new_smsg->set_bytes($$bref, $size);
defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return;
++${$req->{nr}};
do_step($req);
}
} else {
warn "I: remove #$docid $eidx_key @oid\n";
- $self->idx_shard($docid)->shard_remove($docid);
+ $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
}
}
my $top_smsg = pop @$stable;
$top_smsg == $smsg or die 'BUG: top_smsg != smsg';
my $ibx = _ibx_for($self, $sync, $smsg);
- $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+ $idx->index_eml($eml, $smsg, $ibx->eidx_key);
for my $x (reverse @$stable) {
$ibx = _ibx_for($self, $sync, $x);
my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
- $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
+ $idx->ipc_do('add_eidx_info', $docid, $ibx->eidx_key, $hdr);
}
return if $nr == 1; # likely, all good
my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
if ($remain == 0) {
warn "W: #$docid gone or corrupted\n";
- $self->idx_shard($docid)->shard_remove($docid);
+ $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
} elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
$self->git->cat_async($next_oid, \&_reindex_oid, $req);
} else {
warn "BUG: #$docid gone (UNEXPECTED)\n";
- $self->idx_shard($docid)->shard_remove($docid);
+ $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
}
return;
}
my $ci = $self->{current_info};
local $self->{current_info} = "$ci #$docid $oid";
my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
- $re_smsg->{bytes} = $size + crlf_adjust($$bref);
+ $re_smsg->set_bytes($$bref, $size);
my $eml = PublicInbox::Eml->new($bref);
$re_smsg->populate($eml, { autime => $orig_smsg->{ds},
cotime => $orig_smsg->{ts} });
BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
$self->{oidx}->delete_by_num($docid);
- $self->idx_shard($docid)->shard_remove($docid);
+ $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
return;
}
my $self = $req->{self} // die 'BUG: {self} unset';
local $self->{current_info} = "$self->{current_info} $oid";
my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
- $new_smsg->{bytes} = $size + crlf_adjust($$bref);
+ $new_smsg->set_bytes($$bref, $size);
my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
$req->{new_smsg} = $new_smsg;
$req->{chash} = content_hash($eml);
if (scalar(@$xr3) == 0) { # all gone
$self->{oidx}->delete_by_num($docid);
$self->{oidx}->eidxq_del($docid);
- $idx->shard_remove($docid);
+ $idx->ipc_do('xdb_remove', $docid);
} else { # enqueue for reindex of remaining messages
- $idx->shard_remove_eidx_info($docid,
- $ibx->eidx_key);
+ $idx->ipc_do('remove_eidx_info',
+ $docid, $ibx->eidx_key);
$self->{oidx}->eidxq_add($docid); # yes, add
}
}
return if $self->{idx_shards};
$self->git->cleanup;
-
+ my $mode = 0644;
my $ALL = $self->git->{git_dir}; # ALL.git
- PublicInbox::Import::init_bare($ALL) unless -d $ALL;
+ my $old = -d $ALL;
+ if ($opt->{-private}) { # LeiStore
+ $mode = 0600;
+ if (!$old) {
+ umask 077; # don't bother restoring
+ PublicInbox::Import::init_bare($ALL);
+ $self->git->qx(qw(config core.sharedRepository 0600));
+ }
+ } else {
+ PublicInbox::Import::init_bare($ALL) unless $old;
+ }
my $info_dir = "$ALL/objects/info";
my $alt = "$info_dir/alternates";
- my $mode = 0644;
my (@old, @new, %seen); # seen: st_dev + st_ino
if (-e $alt) {
open(my $fh, '<', $alt) or die "open $alt: $!";
$mode = (stat($fh))[2] & 07777;
while (my $line = <$fh>) {
chomp(my $d = $line);
+
+ # expand relative path (/local/ stuff)
+ substr($d, 0, 3) eq '../' and
+ $d = "$ALL/objects/$d";
if (my @st = stat($d)) {
next if $seen{"$st[0]\0$st[1]"}++;
} else {
push @old, $line;
}
}
+
+ # for LeiStore, and possibly some mirror-only state
+ if (opendir(my $dh, my $local = "$self->{topdir}/local")) {
+ # highest numbered epoch first
+ for my $n (sort { $b <=> $a } map { substr($_, 0, -4) + 0 }
+ grep(/\A[0-9]+\.git\z/, readdir($dh))) {
+ my $d = "$local/$n.git/objects"; # absolute path
+ if (my @st = stat($d)) {
+ next if $seen{"$st[0]\0$st[1]"}++;
+ # favor relative paths for rename-friendliness
+ push @new, "../../local/$n.git/objects\n";
+ } else {
+ warn "W: stat($d) failed: $!\n";
+ }
+ }
+ }
for my $ibx (@{$self->{ibx_list}}) {
my $line = $ibx->git->{git_dir} . "/objects\n";
chomp(my $d = $line);
$self->with_umask(\&_idx_init, $self, $opt);
$self->{oidx}->begin_lazy;
$self->{oidx}->eidx_prep;
+ $self->git->batch_prepare;
$self->{midx}->begin_txn;
}
$pr->("performing initial scan ...\n") if $pr;
my $sync = eidx_sync($self, $opt); # initial sync
return if $sync->{quit};
- my $oldset = PublicInbox::Sigfd::block_signals();
+ my $oldset = PublicInbox::DS::block_signals();
local $self->{current_info} = '';
my $cb = $SIG{__WARN__} || \&CORE::warn;
local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
if (!$sigfd) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });