ci/deps.perl
ci/profiles.sh
ci/run.sh
+contrib/completion/lei-completion.bash
contrib/css/216dark.css
contrib/css/216light.css
contrib/css/README
examples/unsubscribe.milter
examples/unsubscribe.psgi
examples/varnish-4.vcl
+lei.sh
lib/PublicInbox/Address.pm
lib/PublicInbox/AddressPP.pm
lib/PublicInbox/Admin.pm
lib/PublicInbox/InboxWritable.pm
lib/PublicInbox/Isearch.pm
lib/PublicInbox/KQNotify.pm
+lib/PublicInbox/LEI.pm
+lib/PublicInbox/LeiExtinbox.pm
+lib/PublicInbox/LeiSearch.pm
+lib/PublicInbox/LeiStore.pm
+lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Linkify.pm
lib/PublicInbox/Listener.pm
lib/PublicInbox/Lock.pm
lib/PublicInbox/NNTPD.pm
lib/PublicInbox/NNTPdeflate.pm
lib/PublicInbox/NewsWWW.pm
+lib/PublicInbox/OnDestroy.pm
lib/PublicInbox/Over.pm
lib/PublicInbox/OverIdx.pm
lib/PublicInbox/ProcessPipe.pm
sa_config/README
sa_config/root/etc/spamassassin/public-inbox.pre
sa_config/user/.spamassassin/user_prefs
+script/lei
script/public-inbox-compact
script/public-inbox-convert
script/public-inbox-edit
t/init.t
t/iso-2202-jp.eml
t/kqnotify.t
+t/lei-oneshot.t
+t/lei.t
+t/lei_store.t
+t/lei_xsearch.t
t/linkify.t
t/main-bin/spamc
t/mda-mime.eml
t/nntpd.t
t/nodatacow.t
t/nulsubject.t
+t/on_destroy.t
t/over.t
t/plack-2-txt-bodies.eml
t/plack-attached-patch.eml
t/xcpdb-reshard.t
xt/cmp-msgstr.t
xt/cmp-msgview.t
+ xt/create-many-inboxes.t
xt/eml_check_limits.t
xt/git-http-backend.t
xt/git_async_cmp.t
# Copyright (C) 2015-2020 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# contains common daemon code for the httpd, imapd, and nntpd servers.
-# This may be used for read-only IMAP server if we decide to implement it.
+#
+# Contains common daemon code for the httpd, imapd, and nntpd servers
+# and designed for handling thousands of untrusted clients over slow
+# and/or lossy connections.
package PublicInbox::Daemon;
use strict;
use warnings;
use POSIX qw(WNOHANG :signal_h);
use Socket qw(IPPROTO_TCP SOL_SOCKET);
sub SO_ACCEPTFILTER () { 0x1000 }
- use Cwd qw/abs_path/;
STDOUT->autoflush(1);
STDERR->autoflush(1);
use PublicInbox::DS qw(now);
sub daemonize () {
if ($daemonize) {
+ require Cwd;
foreach my $i (0..$#ARGV) {
my $arg = $ARGV[$i];
next unless -e $arg;
- $ARGV[$i] = abs_path($arg);
+ $ARGV[$i] = Cwd::abs_path($arg);
}
check_absolute('stdout', $stdout);
check_absolute('stderr', $stderr);
foreach my $fd (3..$end) {
my $s = IO::Handle->new_from_fd($fd, 'r');
if (my $k = sockname($s)) {
- if ($s->blocking) {
- $s->blocking(0);
- warn <<"";
+ my $prev_was_blocking = $s->blocking(0);
+ warn <<"" if $prev_was_blocking;
Inherited socket (fd=$fd) is blocking, making it non-blocking.
Set 'NonBlocking = true' in the systemd.service unit to avoid stalled
processes when multiple service instances start.
- }
$listener_names->{$k} = $s;
push @rv, $s;
} else {
}
sub kill_workers ($) {
- my ($s) = @_;
-
- while (my ($pid, $id) = each %pids) {
- kill $s, $pid;
- }
+ my ($sig) = @_;
+ kill $sig, keys(%pids);
}
sub upgrade_aborted ($) {
use v5.10.1;
use PublicInbox::Over;
use PublicInbox::Inbox;
- use File::Spec ();
use PublicInbox::MiscSearch;
use DBI qw(:sql_types); # SQL_BLOB
use parent qw(PublicInbox::Search);
sub new {
- my (undef, $topdir) = @_;
+ my ($class, $topdir) = @_;
- $topdir = File::Spec->canonpath($topdir);
bless {
topdir => $topdir,
# xpfx => 'ei15'
xpfx => "$topdir/ei".PublicInbox::Search::SCHEMA_VERSION
- }, __PACKAGE__;
+ }, $class;
}
sub misc {
$self->{misc} //= PublicInbox::MiscSearch->new("$self->{xpfx}/misc");
}
- sub search { $_[0] } # self
-# overrides PublicInbox::Search::_xdb
-sub _xdb {
- my ($self) = @_;
- $self->xdb_sharded;
-}
--
# same as per-inbox ->over, for now...
sub over {
my ($self) = @_;
*recent = \&PublicInbox::Inbox::recent;
*max_git_epoch = *nntp_usable = *msg_by_path = \&mm; # undef
- *isrch = *search;
+ *isrch = *search = \&PublicInbox::Search::reopen;
1;
use PublicInbox::InboxWritable;
use PublicInbox::ContentHash qw(content_hash);
use PublicInbox::Eml;
- use File::Spec;
use PublicInbox::DS qw(now);
use DBI qw(:sql_types); # SQL_BLOB
sub new {
my (undef, $dir, $opt) = @_;
- $dir = File::Spec->canonpath($dir);
my $l = $opt->{indexlevel} // 'full';
$l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
die "invalid indexlevel=$l\n";
}, __PACKAGE__;
$self->{shards} = $self->count_shards || nproc_shards($opt->{creat});
my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3");
- $oidx->{-no_fsync} = 1 if $opt->{-no_fsync};
+ $self->{-no_fsync} = $oidx->{-no_fsync} = 1 if !$opt->{fsync};
$self->{oidx} = $oidx;
$self
}
sub attach_inbox {
my ($self, $ibx) = @_;
- my $key = $ibx->eidx_key;
- if (!$ibx->over || !$ibx->mm) {
- warn "W: skipping $key (unindexed)\n";
- return;
- }
- if (!defined($ibx->uidvalidity)) {
- warn "W: skipping $key (no UIDVALIDITY)\n";
- return;
- }
- my $ibxdir = File::Spec->canonpath($ibx->{inboxdir});
- if ($ibxdir ne $ibx->{inboxdir}) {
- warn "W: `$ibx->{inboxdir}' canonicalized to `$ibxdir'\n";
- $ibx->{inboxdir} = $ibxdir;
- }
- $self->{ibx_map}->{$key} //= do {
+ $self->{ibx_map}->{$ibx->eidx_key} //= do {
push @{$self->{ibx_list}}, $ibx;
$ibx;
}
$heads;
}
+ sub _ibx_index_reject ($) {
+ my ($ibx) = @_;
+ $ibx->mm // return 'unindexed, no msgmap.sqlite3';
+ $ibx->uidvalidity // return 'no UIDVALIDITY';
+ $ibx->over // return 'unindexed, no over.sqlite3';
+ undef;
+ }
+
sub _sync_inbox ($$$) {
my ($self, $sync, $ibx) = @_;
+ my $ekey = $ibx->eidx_key;
+ if (defined(my $err = _ibx_index_reject($ibx))) {
+ return "W: skipping $ekey ($err)";
+ }
$sync->{ibx} = $ibx;
$sync->{nr} = \(my $nr = 0);
my $v = $ibx->version;
- my $ekey = $ibx->eidx_key;
if ($v == 2) {
$sync->{epoch_max} = $ibx->max_git_epoch // return;
sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
} elsif ($v == 1) {
my $uv = $ibx->uidvalidity;
my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv");
- my $head = $ibx->mm->last_commit;
- unless (defined $head) {
- warn "E: $ibx->{inboxdir} is not indexed\n";
- return;
- }
+ my $head = $ibx->mm->last_commit //
+ return "E: $ibx->{inboxdir} is not indexed";
my $stk = prepare_stack($sync, $lc ? "$lc..$head" : $head);
my $unit = { stack => $stk, git => $ibx->git };
push @{$sync->{todo}}, $unit;
} else {
- warn "E: $ekey unsupported inbox version (v$v)\n";
- return;
+ return "E: $ekey unsupported inbox version (v$v)";
}
for my $unit (@{delete($sync->{todo}) // []}) {
last if $sync->{quit};
}
$self->{midx}->index_ibx($ibx) unless $sync->{quit};
$ibx->git->cleanup; # done with this inbox, now
+ undef;
}
sub gc_unref_doc ($$$$) {
$self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
}
+ sub _fd_constrained ($) {
+ my ($self) = @_;
+ $self->{-fd_constrained} //= do {
+ my $soft;
+ if (eval { require BSD::Resource; 1 }) {
+ my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
+ ($soft, undef) = BSD::Resource::getrlimit($NOFILE);
+ } else {
+ chomp($soft = `sh -c 'ulimit -n'`);
+ }
+ if (defined($soft)) {
+ my $want = scalar(@{$self->{ibx_list}}) + 64; # estimate
+ my $ret = $want > $soft;
+ if ($ret) {
+ warn <<EOF;
+ RLIMIT_NOFILE=$soft insufficient (want: $want), will close DB handles early
+ EOF
+ }
+ $ret;
+ } else {
+ warn "Unable to determine RLIMIT_NOFILE: $@\n";
+ 1;
+ }
+ };
+ }
+
sub _reindex_finalize ($$$) {
my ($req, $smsg, $eml) = @_;
my $sync = $req->{sync};
my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
$x->{num} = delete($x->{xnum}) // die '{xnum} unset';
$ibx = _ibx_for($self, $sync, $x);
- my $e = $ibx->over->get_art($x->{num});
- $e->{blob} eq $x->{blob} or die <<EOF;
+ if (my $over = $ibx->over) {
+ my $e = $over->get_art($x->{num});
+ $e->{blob} eq $x->{blob} or die <<EOF;
$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
EOF
- push @todo, $ibx, $e;
+ push @todo, $ibx, $e;
+ $over->dbh_close if _fd_constrained($self);
+ } else {
+ die "$ibx->{inboxdir}: over.sqlite3 unusable: $!\n";
+ }
}
undef $by_chash;
while (my ($ibx, $e) = splice(@todo, 0, 2)) {
my $dbh = $self->{oidx}->dbh;
my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
${$sync->{nr}} = 0;
- $sync->{-regen_fmt} = "%u/$tot\n";
+ local $sync->{-regen_fmt} = "%u/$tot\n";
my $pr = $sync->{-opt}->{-progress};
if ($pr) {
my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
my $msgs;
my $pr = $sync->{-opt}->{-progress};
my $ekey = $ibx->eidx_key;
- $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+ local $sync->{-regen_fmt} =
+ "$ekey checking unseen %u/".$ibx->over->max."\n";
${$sync->{nr}} = 0;
while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
my $pr = $sync->{-opt}->{-progress};
my $fetching;
my $ekey = $ibx->eidx_key;
- $sync->{-regen_fmt} =
+ local $sync->{-regen_fmt} =
"$ekey check stale/missing %u/".$ibx->over->max."\n";
${$sync->{nr}} = 0;
do {
sub _reindex_inbox ($$$) {
my ($self, $sync, $ibx) = @_;
- local $self->{current_info} = $ibx->eidx_key;
- _reindex_check_unseen($self, $sync, $ibx);
- _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+ my $ekey = $ibx->eidx_key;
+ local $self->{current_info} = $ekey;
+ if (defined(my $err = _ibx_index_reject($ibx))) {
+ warn "W: cannot reindex $ekey ($err)\n";
+ } else {
+ _reindex_check_unseen($self, $sync, $ibx);
+ _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+ }
delete @$ibx{qw(over mm search git)}; # won't need these for a bit
}
eidxq_process($self, $sync) unless $sync->{quit};
}
+ sub sync_inbox {
+ my ($self, $sync, $ibx) = @_;
+ my $err = _sync_inbox($self, $sync, $ibx);
+ delete @$ibx{qw(mm over)};
+ warn $err, "\n" if defined($err);
+ }
+
sub eidx_sync { # main entry point
my ($self, $opt) = @_;
- my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+ my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
local $self->{current_info} = '';
local $SIG{__WARN__} = sub {
$warn_cb->($self->{current_info}, ': ', @_);
$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
}
if (delete($opt->{reindex})) {
- $sync->{checkpoint_unlocks} = 1;
+ local $sync->{checkpoint_unlocks} = 1;
eidx_reindex($self, $sync);
}
# don't use $_ here, it'll get clobbered by reindex_checkpoint
- for my $ibx (@{$self->{ibx_list}}) {
- last if $sync->{quit};
- _sync_inbox($self, $sync, $ibx);
+ if ($opt->{scan} // 1) {
+ for my $ibx (@{$self->{ibx_list}}) {
+ last if $sync->{quit};
+ sync_inbox($self, $sync, $ibx);
+ }
}
$self->{oidx}->rethread_done($opt) unless $sync->{quit};
eidxq_process($self, $sync) unless $sync->{quit};
eidxq_release($self);
- PublicInbox::V2Writable::done($self);
+ done($self);
+ $sync; # for eidx_watch
}
sub update_last_commit { # overrides V2Writable
return if $self->{idx_shards};
$self->git->cleanup;
-
+ my $mode = 0644;
my $ALL = $self->git->{git_dir}; # ALL.git
- PublicInbox::Import::init_bare($ALL) unless -d $ALL;
+ my $old = -d $ALL;
+ if ($opt->{-private}) { # LeiStore
+ $mode = 0600;
+ if (!$old) {
+ umask 077; # don't bother restoring
+ PublicInbox::Import::init_bare($ALL);
+ $self->git->qx(qw(config core.sharedRepository 0600));
+ }
+ } else {
+ PublicInbox::Import::init_bare($ALL) unless $old;
+ }
my $info_dir = "$ALL/objects/info";
my $alt = "$info_dir/alternates";
- my $mode = 0644;
my (@old, @new, %seen); # seen: st_dev + st_ino
if (-e $alt) {
open(my $fh, '<', $alt) or die "open $alt: $!";
$mode = (stat($fh))[2] & 07777;
while (my $line = <$fh>) {
chomp(my $d = $line);
+
+ # expand relative path (/local/ stuff)
+ substr($d, 0, 3) eq '../' and
+ $d = "$ALL/objects/$d";
if (my @st = stat($d)) {
next if $seen{"$st[0]\0$st[1]"}++;
} else {
push @old, $line;
}
}
+
+ # for LeiStore, and possibly some mirror-only state
+ if (opendir(my $dh, my $local = "$self->{topdir}/local")) {
+ # highest numbered epoch first
+ for my $n (sort { $b <=> $a } map { substr($_, 0, -4) + 0 }
+ grep(/\A[0-9]+\.git\z/, readdir($dh))) {
+ my $d = "$local/$n.git/objects"; # absolute path
+ if (my @st = stat($d)) {
+ next if $seen{"$st[0]\0$st[1]"}++;
+ # favor relative paths for rename-friendliness
+ push @new, "../../local/$n.git/objects\n";
+ } else {
+ warn "W: stat($d) failed: $!\n";
+ }
+ }
+ }
for my $ibx (@{$self->{ibx_list}}) {
my $line = $ibx->git->{git_dir} . "/objects\n";
chomp(my $d = $line);
PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o);
}
$self->parallel_init($self->{indexlevel});
- $self->umask_prepare;
$self->with_umask(\&_idx_init, $self, $opt);
$self->{oidx}->begin_lazy;
$self->{oidx}->eidx_prep;
$self->{midx}->begin_txn;
}
+ sub _watch_commit { # PublicInbox::DS::add_timer callback
+ my ($self) = @_;
+ delete $self->{-commit_timer};
+ eidxq_process($self, $self->{-watch_sync});
+ eidxq_release($self);
+ delete local $self->{-watch_sync}->{-regen_fmt};
+ reindex_checkpoint($self, $self->{-watch_sync});
+
+ # call event_step => done unless commit_timer is armed
+ PublicInbox::DS::requeue($self);
+ }
+
+ sub on_inbox_unlock { # called by PublicInbox::InboxIdle
+ my ($self, $ibx) = @_;
+ my $opt = $self->{-watch_sync}->{-opt};
+ my $pr = $opt->{-progress};
+ my $ekey = $ibx->eidx_key;
+ local $0 = "sync $ekey";
+ $pr->("indexing $ekey\n") if $pr;
+ $self->idx_init($opt);
+ sync_inbox($self, $self->{-watch_sync}, $ibx);
+ $self->{-commit_timer} //= PublicInbox::DS::add_timer(
+ $opt->{'commit-interval'} // 10,
+ \&_watch_commit, $self);
+ }
+
+ sub eidx_reload { # -extindex --watch SIGHUP handler
+ my ($self, $idler) = @_;
+ if ($self->{cfg}) {
+ my $pr = $self->{-watch_sync}->{-opt}->{-progress};
+ $pr->('reloading ...') if $pr;
+ delete $self->{-resync_queue};
+ @{$self->{ibx_list}} = ();
+ %{$self->{ibx_map}} = ();
+ delete $self->{-watch_sync}->{id2pos};
+ my $cfg = PublicInbox::Config->new;
+ attach_config($self, $cfg);
+ $idler->refresh($cfg);
+ $pr->(" done\n") if $pr;
+ } else {
+ warn "reload not supported without --all\n";
+ }
+ }
+
+ sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
+ my ($self) = @_;
+ $self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
+ PublicInbox::DS::requeue($self); # trigger our ->event_step
+ }
+
+ sub event_step { # PublicInbox::DS::requeue callback
+ my ($self) = @_;
+ if (my $resync_queue = $self->{-resync_queue}) {
+ if (my $ibx = shift(@$resync_queue)) {
+ on_inbox_unlock($self, $ibx);
+ PublicInbox::DS::requeue($self);
+ } else {
+ delete $self->{-resync_queue};
+ _watch_commit($self);
+ }
+ } else {
+ done($self) unless $self->{-commit_timer};
+ }
+ }
+
+ sub eidx_watch { # public-inbox-extindex --watch main loop
+ my ($self, $opt) = @_;
+ local %SIG = %SIG;
+ for my $sig (qw(HUP USR1 TSTP QUIT INT TERM)) {
+ $SIG{$sig} = sub { warn "SIG$sig ignored while scanning\n" };
+ }
+ require PublicInbox::InboxIdle;
+ require PublicInbox::DS;
+ require PublicInbox::Syscall;
+ require PublicInbox::Sigfd;
+ my $idler = PublicInbox::InboxIdle->new($self->{cfg});
+ if (!$self->{cfg}) {
+ $idler->watch_inbox($_) for @{$self->{ibx_list}};
+ }
+ $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
+ my $pr = $opt->{-progress};
+ $pr->("performing initial scan ...\n") if $pr;
+ my $sync = eidx_sync($self, $opt); # initial sync
+ return if $sync->{quit};
+ my $oldset = PublicInbox::Sigfd::block_signals();
+ local $self->{current_info} = '';
+ my $cb = $SIG{__WARN__} || \&CORE::warn;
+ local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+ my $sig = {
+ HUP => sub { eidx_reload($self, $idler) },
+ USR1 => sub { eidx_resync_start($self) },
+ TSTP => sub { kill('STOP', $$) },
+ };
+ my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+ $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
+ my $sigfd = PublicInbox::Sigfd->new($sig,
+ $PublicInbox::Syscall::SFD_NONBLOCK);
+ %SIG = (%SIG, %$sig) if !$sigfd;
+ local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
+ if (!$sigfd) {
+ # wake up every second to accept signals if we don't
+ # have signalfd or IO::KQueue:
+ PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS->SetLoopTimeout(1000);
+ }
+ PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+ $pr->("initial scan complete, entering event loop\n") if $pr;
+ PublicInbox::DS->EventLoop; # calls InboxIdle->event_step
+ done($self);
+ }
+
no warnings 'once';
*done = \&PublicInbox::V2Writable::done;
- *umask_prepare = \&PublicInbox::InboxWritable::umask_prepare;
*with_umask = \&PublicInbox::InboxWritable::with_umask;
*parallel_init = \&PublicInbox::V2Writable::parallel_init;
*nproc_shards = \&PublicInbox::V2Writable::nproc_shards;
return ($self->{in}, $self->{out}) if $self->{pid};
- my (@ret, $out_r, $out_w);
+ my ($in_r, $pid, $out_r, $out_w);
pipe($out_r, $out_w) or die "pipe failed: $!";
$self->lock_acquire;
my ($git, $ref) = @$self{qw(git ref)};
local $/ = "\n";
chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
+ die "fatal: rev-parse --revs-only $ref: \$?=$?" if $?;
if ($self->{path_type} ne '2/38' && $self->{tip}) {
local $/ = "\0";
my @t = $git->qx(qw(ls-tree -r -z --name-only), $ref);
+ die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?;
chomp @t;
$self->{-tree} = { map { $_ => 1 } @t };
}
my @cmd = ('git', "--git-dir=$git->{git_dir}",
qw(fast-import --quiet --done --date-format=raw));
- my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
+ ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
$out_w->autoflush(1);
$self->{in} = $in_r;
$self->{out} = $out_w;
$self->{pid} = $pid;
$self->{nchg} = 0;
- @ret = ($in_r, $out_w);
};
if ($@) {
$self->lock_release;
die $@;
}
- @ret;
+ ($in_r, $out_w);
}
sub wfail () { die "write to fast-import failed: $!" }
if ($smsg) {
$smsg->{blob} = $self->get_mark(":$blob");
$smsg->{raw_bytes} = $n;
+ if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore
+ return if $oidx->blob_exists($smsg->{blob});
+ }
+ # XXX do we need this? it's in git at this point
$smsg->{-raw_email} = \$raw_email;
}
my $ref = $self->{ref};
use IO::Compress::Gzip qw(gzip);
use HTTP::Date qw(time2str);
-our $json = PublicInbox::Config::json();
+my $json = PublicInbox::Config::json();
# called by WwwListing
sub url_regexp {
'Content-Length', bytes::length($out) ], [ $out ] ]
}
+ sub per_inbox {
+ my ($ctx) = @_;
+ # only one inbox, slow is probably OK
+ slow_manifest_add($ctx, $ctx->{ibx});
+ psgi_triple($ctx);
+ }
+
1;
sub create {
my ($self) = @_;
- unless (-r $self->{filename}) {
+ my $fn = $self->{filename} // do {
+ Carp::confess('BUG: no {filename}') unless $self->{dbh};
+ return;
+ };
+ unless (-r $fn) {
require File::Path;
require File::Basename;
- File::Path::mkpath(File::Basename::dirname($self->{filename}));
+ File::Path::mkpath(File::Basename::dirname($fn));
}
# create the DB:
PublicInbox::Over::dbh($self);
}
+sub blob_exists {
+ my ($self, $oidhex) = @_;
+ my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
+SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
+
+ $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+ $sth->execute;
+ $sth->fetchrow_array;
+}
+
1;
package PublicInbox::Search;
use strict;
use parent qw(Exporter);
- our @EXPORT_OK = qw(retry_reopen);
+ our @EXPORT_OK = qw(retry_reopen int_val);
use List::Util qw(max);
# values for searching, changing the numeric value breaks
our %X = map { $_ => 0 } qw(BoolWeight Database Enquire QueryParser Stem Query);
our $Xap; # 'Search::Xapian' or 'Xapian'
our $NVRP; # '$Xap::'.('NumberValueRangeProcessor' or 'NumberRangeProcessor')
- our $ENQ_ASCENDING;
+
+ # ENQ_DESCENDING and ENQ_ASCENDING weren't in SWIG Xapian.pm prior to 1.4.16,
+ # let's hope the ABI is stable
+ our $ENQ_DESCENDING = 0;
+ our $ENQ_ASCENDING = 1;
sub load_xapian () {
return 1 if defined $Xap;
'NumberRangeProcessor' : 'NumberValueRangeProcessor');
$X{$_} = $Xap.'::'.$_ for (keys %X);
- # ENQ_ASCENDING doesn't seem exported by SWIG Xapian.pm,
- # so lets hope this part of the ABI is stable because it's
- # just an integer:
- $ENQ_ASCENDING = $x eq 'Xapian' ?
- 1 : Search::Xapian::ENQ_ASCENDING();
-
*sortable_serialise = $x.'::sortable_serialise';
+ *sortable_unserialise = $x.'::sortable_unserialise';
# n.b. FLAG_PURE_NOT is expensive not suitable for a public
# website as it could become a denial-of-service vector
# FLAG_PHRASE also seems to cause performance problems chert
}
}
-sub xdb_sharded {
+# returns all shards as separate Xapian::Database objects w/o combining
+sub xdb_shards_flat ($) {
my ($self) = @_;
- opendir(my $dh, $self->{xpfx}) or return; # not initialized yet
-
- # We need numeric sorting so shard[0] is first for reading
- # Xapian metadata, if needed
- my $last = max(grep(/\A[0-9]+\z/, readdir($dh))) // return;
+ my $xpfx = $self->{xpfx};
my (@xdb, $slow_phrase);
- for (0..$last) {
- my $shard_dir = "$self->{xpfx}/$_";
- if (-d $shard_dir && -r _) {
+ load_xapian();
+ if ($xpfx =~ m/xapian${\SCHEMA_VERSION}\z/) {
+ @xdb = ($X{Database}->new($xpfx));
+ $self->{qp_flags} |= FLAG_PHRASE() if !-f "$xpfx/iamchert";
+ } else {
+ opendir(my $dh, $xpfx) or return (); # not initialized yet
+ # We need numeric sorting so shard[0] is first for reading
+ # Xapian metadata, if needed
+ my $last = max(grep(/\A[0-9]+\z/, readdir($dh))) // return ();
+ for (0..$last) {
+ my $shard_dir = "$self->{xpfx}/$_";
push @xdb, $X{Database}->new($shard_dir);
$slow_phrase ||= -f "$shard_dir/iamchert";
- } else { # gaps from missing epochs throw off mdocid()
- warn "E: $shard_dir missing or unreadable\n";
- return;
}
+ $self->{qp_flags} |= FLAG_PHRASE() if !$slow_phrase;
}
- $self->{qp_flags} |= FLAG_PHRASE() if !$slow_phrase;
- $self->{nshard} = scalar(@xdb);
- my $xdb = shift @xdb;
- $xdb->add_database($_) for @xdb;
- $xdb;
-}
-
-sub _xdb {
- my ($self) = @_;
- my $dir = xdir($self, 1);
- $self->{qp_flags} //= $QP_FLAGS;
- if ($self->{ibx_ver} >= 2) {
- xdb_sharded($self);
- } else {
- $self->{qp_flags} |= FLAG_PHRASE() if !-f "$dir/iamchert";
- $X{Database}->new($dir);
- }
+ @xdb;
}
# v2 Xapian docids don't conflict, so they're identical to
sub mset_to_artnums {
my ($self, $mset) = @_;
- my $nshard = $self->{nshard} // 1;
+ my $nshard = $self->{nshard};
[ map { mdocid($nshard, $_) } $mset->items ];
}
sub xdb ($) {
my ($self) = @_;
$self->{xdb} //= do {
- load_xapian();
- $self->_xdb;
+ $self->{qp_flags} //= $QP_FLAGS;
+ my @xdb = $self->xdb_shards_flat or return;
+ $self->{nshard} = scalar(@xdb);
+ my $xdb = shift @xdb;
+ $xdb->add_database($_) for @xdb;
+ $xdb;
};
}
-sub xpfx_init ($) {
- my ($self) = @_;
- if ($self->{ibx_ver} == 1) {
- $self->{xpfx} .= '/public-inbox/xapian' . SCHEMA_VERSION;
- } else {
- $self->{xpfx} .= '/xap'.SCHEMA_VERSION;
- }
-}
-
sub new {
my ($class, $ibx) = @_;
ref $ibx or die "BUG: expected PublicInbox::Inbox object: $ibx";
- my $self = bless {
- xpfx => $ibx->{inboxdir}, # for xpfx_init
+ my $xap = $ibx->version > 1 ? 'xap' : 'public-inbox/xapian';
+ bless {
+ xpfx => "$ibx->{inboxdir}/$xap" . SCHEMA_VERSION,
altid => $ibx->{altid},
- ibx_ver => $ibx->version,
}, $class;
- xpfx_init($self);
- $self;
}
sub reopen {
$opts ||= {};
my $qp = $self->{qp} //= qparse_new($self);
my $query = $qp->parse_query($query_string, $self->{qp_flags});
- $opts->{relevance} = 1 unless exists $opts->{relevance};
_do_enquire($self, $query, $opts);
}
$enquire->set_query($query);
$opts ||= {};
my $desc = !$opts->{asc};
- if (($opts->{mset} || 0) == 2) { # mset == 2: ORDER BY docid/UID
+ my $rel = $opts->{relevance} // 0;
+ if ($rel == -1) { # ORDER BY docid/UID
+ $enquire->set_weighting_scheme($X{BoolWeight}->new);
$enquire->set_docid_order($ENQ_ASCENDING);
+ } elsif ($rel == 0) {
+ $enquire->set_sort_by_value_then_relevance(TS, $desc);
+ } elsif ($rel == -2) {
$enquire->set_weighting_scheme($X{BoolWeight}->new);
- } elsif ($opts->{relevance}) {
+ $enquire->set_docid_order($ENQ_DESCENDING);
+ } else { # rel > 0
$enquire->set_sort_by_relevance_then_value(TS, $desc);
- } else {
- $enquire->set_sort_by_value_then_relevance(TS, $desc);
}
# `mairix -t / --threads' or JMAP collapseThreads
sub mset_to_smsg {
my ($self, $ibx, $mset) = @_;
- my $nshard = $self->{nshard} // 1;
+ my $nshard = $self->{nshard};
my $i = 0;
my %order = map { mdocid($nshard, $_) => ++$i } $mset->items;
my @msgs = sort {
\@ret;
}
+ sub int_val ($$) {
+ my ($doc, $col) = @_;
+ my $val = $doc->get_value($col) or return; # undefined is '' in Xapian
+ sortable_unserialise($val) + 0; # PV => IV conversion
+ }
+
1;
# Copyright (C) 2015-2020 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# based on notmuch, but with no concept of folders, files or flags
+# based on notmuch, but with no concept of folders, files
#
# Indexes mail with Xapian and our (SQLite-based) ::Msgmap for use
# with the web and NNTP interfaces. This index maintains thread
}
}
$ibx = PublicInbox::InboxWritable->new($ibx);
- my $self = bless {
- ibx => $ibx,
- xpfx => $inboxdir, # for xpfx_init
- -altid => $altid,
- ibx_ver => $version,
- indexlevel => $indexlevel,
- }, $class;
- $self->xpfx_init;
+ my $self = PublicInbox::Search->new($ibx);
+ bless $self, $class;
+ $self->{ibx} = $ibx;
+ $self->{-altid} = $altid;
+ $self->{indexlevel} = $indexlevel;
$self->{-set_indexlevel_once} = 1 if $indexlevel eq 'medium';
if ($ibx->{-skip_docdata}) {
$self->{-set_skip_docdata_once} = 1;
$self->{-skip_docdata} = 1;
}
- $ibx->umask_prepare;
if ($version == 1) {
$self->{lock_path} = "$inboxdir/ssoma.lock";
my $dir = $self->xdir;
}
eval 'require '.$X->{WritableDatabase} or die;
*sortable_serialise = $xap.'::sortable_serialise';
- *sortable_unserialise = $xap.'::sortable_unserialise';
$DB_CREATE_OR_OPEN = eval($xap.'::DB_CREATE_OR_OPEN()');
$DB_OPEN = eval($xap.'::DB_OPEN()');
my $ver = (eval($xap.'::major_version()') << 16) |
index_headers($self, $smsg);
if (defined(my $eidx_key = $smsg->{eidx_key})) {
- $doc->add_boolean_term('O'.$eidx_key);
+ $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
}
msg_iter($eml, \&index_xapian, [ $self, $doc ]);
index_ids($self, $doc, $eml, $mids);
sub _msgmap_init ($) {
my ($self) = @_;
- die "BUG: _msgmap_init is only for v1\n" if $self->{ibx_ver} != 1;
+ die "BUG: _msgmap_init is only for v1\n" if $self->{ibx}->version != 1;
$self->{mm} //= eval {
require PublicInbox::Msgmap;
my $rw = $self->{ibx}->{-no_fsync} ? 2 : 1;
begin_txn_lazy($self);
my $doc = _get_doc($self, $docid) or return;
term_generator($self)->set_document($doc);
- $doc->add_boolean_term('O'.$eidx_key);
+ $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
index_list_id($self, $doc, $eml);
$self->{xdb}->replace_document($docid, $doc);
}
$self->{xdb}->replace_document($docid, $doc);
}
- sub get_val ($$) {
- my ($doc, $col) = @_;
- sortable_unserialise($doc->get_value($col));
- }
-
+sub set_keywords {
+ my ($self, $docid, @kw) = @_;
+ begin_txn_lazy($self);
+ my $doc = _get_doc($self, $docid) or return;
+ my %keep = map { $_ => 1 } @kw;
+ my %add = %keep;
+ my @rm;
+ my $end = $doc->termlist_end;
+ for (my $cur = $doc->termlist_begin; $cur != $end; $cur++) {
+ $cur->skip_to('K');
+ last if $cur == $end;
+ my $kw = $cur->get_termname;
+ $kw =~ s/\AK//s or next;
+ $keep{$kw} ? delete($add{$kw}) : push(@rm, $kw);
+ }
+ return unless (scalar(@rm) + scalar(keys %add));
+ $doc->remove_term('K'.$_) for @rm;
+ $doc->add_boolean_term('K'.$_) for (keys %add);
+ $self->{xdb}->replace_document($docid, $doc);
+}
+
+sub add_keywords {
+ my ($self, $docid, @kw) = @_;
+ begin_txn_lazy($self);
+ my $doc = _get_doc($self, $docid) or return;
+ $doc->add_boolean_term('K'.$_) for @kw;
+ $self->{xdb}->replace_document($docid, $doc);
+}
+
+sub remove_keywords {
+ my ($self, $docid, @kw) = @_;
+ begin_txn_lazy($self);
+ my $doc = _get_doc($self, $docid) or return;
+ my $replace;
+ eval {
+ $doc->remove_term('K'.$_);
+ $replace = 1
+ } for @kw;
+ $self->{xdb}->replace_document($docid, $doc) if $replace;
+}
+
sub smsg_from_doc ($) {
my ($doc) = @_;
my $data = $doc->get_data or return;
my $smsg = bless {}, 'PublicInbox::Smsg';
- $smsg->{ts} = get_val($doc, PublicInbox::Search::TS());
- my $dt = get_val($doc, PublicInbox::Search::DT());
+ $smsg->{ts} = int_val($doc, PublicInbox::Search::TS());
+ my $dt = int_val($doc, PublicInbox::Search::DT());
my ($yyyy, $mon, $dd, $hh, $mm, $ss) = unpack('A4A2A2A2A2A2', $dt);
$smsg->{ds} = timegm($ss, $mm, $hh, $dd, $mon - 1, $yyyy);
$smsg->load_from_data($data);
$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
die "E: could not generate NNTP article number for $oid";
add_message($self, $eml, $smsg, $sync);
+ ++$self->{nidx};
my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
${$sync->{latest_cmt}} = $cur_cmt;
}
if (defined(my $cur_cmt = $sync->{cur_cmt})) {
${$sync->{latest_cmt}} = $cur_cmt;
}
+ ++$self->{nidx};
}
sub with_umask {
my $OID = qr/[a-f0-9]{40,}/;
# an estimate of the post-packed size to the raw uncompressed size
-my $PACKING_FACTOR = 0.4;
+our $PACKING_FACTOR = 0.4;
# SATA storage lags behind what CPUs are capable of, so relying on
# nproc(1) can be misleading and having extra Xapian shards is a
delete $ibx->{search};
$srch->{nshard} // 0
} else { # ExtSearchIdx
- $self->{nshard} // do {
- if ($self->xdb_sharded) {
- $self->{nshard} // die 'BUG: {nshard} unset';
- } else {
- 0;
- }
- }
+ $self->{nshard} ||= scalar($self->xdb_shards_flat);
}
}
die "$dir does not exist\n";
}
}
- $v2ibx->umask_prepare;
-
my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION;
my $self = {
ibx => $v2ibx,
$ibx->git->cleanup;
parallel_init($self, $ibx->{indexlevel});
- $ibx->umask_prepare;
$ibx->with_umask(\&_idx_init, $self, $opt);
}
$self->done; # release lock
}
- if (my $pr = $sync->{-opt}->{-progress}) {
+ if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) {
$pr->(sprintf($sync->{-regen_fmt}, ${$sync->{nr}}));
}
$mm_tmp->atfork_parent if $mm_tmp;
}
+ sub index_finalize ($$) {
+ my ($arg, $index) = @_;
+ ++$arg->{self}->{nidx};
+ if (defined(my $cur = $arg->{cur_cmt})) {
+ ${$arg->{latest_cmt}} = $cur;
+ } elsif ($index) {
+ die 'BUG: {cur_cmt} missing';
+ } # else { unindexing @leftovers doesn't set {cur_cmt}
+ }
+
sub index_oid { # cat_async callback
my ($bref, $oid, $type, $size, $arg) = @_;
- return if is_bad_blob($oid, $type, $size, $arg->{oid});
+ is_bad_blob($oid, $type, $size, $arg->{oid}) and
+ return index_finalize($arg, 1); # size == 0 purged returns here
my $self = $arg->{self};
local $self->{current_info} = "$self->{current_info} $oid";
- return if $size == 0; # purged
my ($num, $mid0);
my $eml = PublicInbox::Eml->new($$bref);
my $mids = mids($eml);
if (do_idx($self, $bref, $eml, $smsg)) {
${$arg->{need_checkpoint}} = 1;
}
- ${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+ index_finalize($arg, 1);
}
# only update last_commit for $i on reindex iff newer than current
-d $git_dir or next; # missing epochs are fine
my $git = PublicInbox::Git->new($git_dir);
my $unit = { git => $git, epoch => $i };
+ my $tip;
if ($reindex_heads) {
- $head = $reindex_heads->[$i] or next;
+ $tip = $head = $reindex_heads->[$i] or next;
+ } else {
+ $tip = $git->qx(qw(rev-parse -q --verify), $head);
+ next if $?; # new repo
+ chomp $tip;
}
- chomp(my $tip = $git->qx(qw(rev-parse -q --verify), $head));
- next if $?; # new repo
-
my $range = log_range($sync, $unit, $tip) or next;
# can't use 'rev-list --count' if we use --diff-filter
$pr->("$pfx $i.git counting $range ... ") if $pr;
}
sub unindex_oid ($$;$) { # git->cat_async callback
- my ($bref, $oid, $type, $size, $sync) = @_;
- return if is_bad_blob($oid, $type, $size, $sync->{oid});
- my $self = $sync->{self};
+ my ($bref, $oid, $type, $size, $arg) = @_;
+ is_bad_blob($oid, $type, $size, $arg->{oid}) and
+ return index_finalize($arg, 0);
+ my $self = $arg->{self};
local $self->{current_info} = "$self->{current_info} $oid";
- my $unindexed = $sync->{in_unindex} ? $sync->{unindexed} : undef;
+ my $unindexed = $arg->{in_unindex} ? $arg->{unindexed} : undef;
my $mm = $self->{mm};
my $mids = mids(PublicInbox::Eml->new($bref));
undef $$bref;
}
unindex_oid_aux($self, $oid, $mid);
}
+ index_finalize($arg, 0);
}
sub git { $_[0]->{ibx}->git }
$opt //= {};
return xapian_only($self, $opt) if $opt->{xapian_only};
- my $pr = $opt->{-progress};
my $epoch_max;
- my $latest = $self->{ibx}->git_dir_latest(\$epoch_max);
- return unless defined $latest;
+ my $latest = $self->{ibx}->git_dir_latest(\$epoch_max) // return;
+ if ($opt->{'fast-noop'}) { # nanosecond (st_ctim) comparison
+ use Time::HiRes qw(stat);
+ if (my @mm = stat("$self->{ibx}->{inboxdir}/msgmap.sqlite3")) {
+ my $c = $mm[10]; # 10 = ctime (nsec NV)
+ my @hd = stat("$latest/refs/heads");
+ my @pr = stat("$latest/packed-refs");
+ return if $c > ($hd[10] // 0) && $c > ($pr[10] // 0);
+ }
+ }
+ my $pr = $opt->{-progress};
my $seq = $opt->{sequential_shard};
my $art_beg; # the NNTP article number we start xapian_only at
my $idxlevel = $self->{ibx}->{indexlevel};