This will be used for keyword (and label) storage for externals.
We'll be using this to ensure we don't redundantly auto-import
messages into lei/store if they're already in a local external
(they can still be imported explicitly via "lei import").
lib/PublicInbox/Isearch.pm
lib/PublicInbox/KQNotify.pm
lib/PublicInbox/LEI.pm
+lib/PublicInbox/LeiALE.pm
lib/PublicInbox/LeiAuth.pm
lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiCurl.pm
.'/lei/config');
}
+sub cache_dir ($) {
+ my ($self) = @_;
+ rel2abs($self, ($self->{env}->{XDG_CACHE_HOME} //
+ ($self->{env}->{HOME} // '/nonexistent').'/.cache')
+ .'/lei');
+}
+
+sub ale {
+ my ($self) = @_;
+ $self->{ale} //= do {
+ require PublicInbox::LeiALE;
+ PublicInbox::LeiALE->new(cache_dir($self).
+ '/all_locals_ever.git');
+ };
+}
+
sub index_opt {
# TODO: drop underscore variants everywhere, they're undocumented
qw(fsync|sync! jobs|j=i indexlevel|L=s compact
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# All Locals Ever: track lei/store + externals ever used as
+# long as they're on an accessible FS. Includes "lei q" --include
+# and --only targets that haven't been through "lei add-external".
+# Typically: ~/.cache/lei/all_locals_ever.git
+package PublicInbox::LeiALE;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::LeiSearch PublicInbox::Lock);
+use PublicInbox::Git;
+use PublicInbox::Import;
+use Fcntl qw(SEEK_SET);
+
+sub new {
+ my ($cls, $d) = @_;
+ PublicInbox::Import::init_bare($d, 'ale');
+ bless {
+ git => PublicInbox::Git->new($d),
+ lock_path => "$d/lei_ale.state", # dual-duty lock + state
+ ibxish => [], # Inbox and ExtSearch (and LeiSearch) objects
+ }, $cls;
+}
+
+sub over {} # undef for xoids_for
+
+sub overs_all { # for xoids_for (called only in lei workers?)
+ my ($self) = @_;
+ my $pid = $$;
+ if (($self->{owner_pid} // $pid) != $pid) {
+ delete($_->{over}) for @{$self->{ibxish}};
+ }
+ $self->{owner_pid} = $pid;
+ grep(defined, map { $_->over } @{$self->{ibxish}});
+}
+
+sub refresh_externals {
+ my ($self, $lxs) = @_;
+ $self->git->cleanup;
+ my $lk = $self->lock_for_scope;
+ my $cur_lxs = ref($lxs)->new;
+ my $orig = do {
+ local $/;
+ readline($self->{lockfh}) //
+ die "readline($self->{lock_path}): $!";
+ };
+ my $new = '';
+ my $old = '';
+ my $gone = 0;
+ my %seen_ibxish; # $dir => any-defined value
+ for my $dir (split(/\n/, $orig)) {
+ if (-d $dir && -r _ && $cur_lxs->prepare_external($dir)) {
+ $seen_ibxish{$dir} //= length($old .= "$dir\n");
+ } else {
+ ++$gone;
+ }
+ }
+ my @ibxish = $cur_lxs->locals;
+ for my $x ($lxs->locals) {
+ my $d = File::Spec->canonpath($x->{inboxdir} // $x->{topdir});
+ $seen_ibxish{$d} //= do {
+ $new .= "$d\n";
+ push @ibxish, $x;
+ };
+ }
+ if ($new ne '' || $gone) {
+ $self->{lockfh}->autoflush(1);
+ if ($gone) {
+ seek($self->{lockfh}, 0, SEEK_SET) or die "seek: $!";
+ truncate($self->{lockfh}, 0) or die "truncate: $!";
+ } else {
+ $old = '';
+ }
+ print { $self->{lockfh} } $old, $new or die "print: $!";
+ }
+ $new = $old = '';
+ my $f = $self->git->{git_dir}.'/objects/info/alternates';
+ if (open my $fh, '<', $f) {
+ local $/;
+ $old = <$fh> // die "readline($f): $!";
+ }
+ for my $x (@ibxish) {
+ $new .= File::Spec->canonpath($x->git->{git_dir})."/objects\n";
+ }
+ $self->{ibxish} = \@ibxish;
+ return if $old eq $new;
+
+ # this needs to be atomic since child processes may start
+ # git-cat-file at any time
+ my $tmp = "$f.$$.tmp";
+ open my $fh, '>', $tmp or die "open($tmp): $!";
+ print $fh $new or die "print($tmp): $!";
+ close $fh or die "close($tmp): $!";
+ rename($tmp, $f) or die "rename($tmp, $f): $!";
+}
+
+1;
my $key = "external.$location.boost";
my $cur_boost = $cfg->{$key};
return if defined($cur_boost) && $cur_boost == $new_boost; # idempotent
+ if (-d $location) {
+ require PublicInbox::LeiXSearch;
+ my $lxs = PublicInbox::LeiXSearch->new;
+ $lxs->prepare_external($location);
+ $self->ale->refresh_externals($lxs);
+ }
$self->lei_config($key, $new_boost);
}
$wcb->(undef, $smsg, $eml);
};
} elsif ($l2m && $l2m->{-wq_s1}) {
- my $git_dir = $ibxish->git->{git_dir};
sub {
my ($smsg, $mitem) = @_;
$smsg->{pct} = get_pct($mitem) if $mitem;
- $l2m->wq_io_do('write_mail', [], $git_dir, $smsg);
+ $l2m->wq_io_do('write_mail', [], $smsg);
}
} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
}
if ($opt->{'local'} //= scalar(@only) ? 0 : 1) {
$lxs->prepare_external($lse);
+ } else {
+ my $tmp = PublicInbox::LeiXSearch->new;
+ $tmp->prepare_external($lse);
+ $self->ale->refresh_externals($tmp);
}
if (@only) {
for my $loc (@only) {
unless ($lxs->locals || $lxs->remotes) {
return $self->fail('no local or remote inboxes to search');
}
+ $self->ale->refresh_externals($lxs);
my ($xj, $mj) = split(/,/, $opt->{jobs} // '');
if (defined($xj) && $xj ne '' && $xj !~ /\A[1-9][0-9]*\z/) {
return $self->fail("`$xj' search jobs must be >= 1");
for my $loc (@loc) { # locals only
$lxs->prepare_external($loc) if -d $loc;
}
+ $self->{lei}->ale->refresh_externals($lxs);
+ $lxs->{git} = $self->{lei}->ale->git;
$self->{lxs_all_local} = $lxs;
$self->{cur_cfg} = $cfg;
}
- ($lxs->{git_tmp} //= $lxs->git_tmp)->{git_dir};
}
sub write_prepare {
$self->ipc_worker_spawn('lei_store', $lei->oldset,
{ lei => $lei });
}
- $lei->{all_ext_git_dir} = $self->ipc_do('refresh_local_externals');
+ my $wait = $self->ipc_do('refresh_local_externals');
$lei->{sto} = $self;
}
use PublicInbox::ProcessPipe;
use PublicInbox::Spawn qw(which spawn popen_rd);
use PublicInbox::LeiDedupe;
-use PublicInbox::Git;
use PublicInbox::GitAsyncCat;
use PublicInbox::PktOp qw(pkt_do);
use Symbol qw(gensym);
}
sub write_mail { # via ->wq_io_do
- my ($self, $git_dir, $smsg) = @_;
- my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
- git_async_cat($git, $smsg->{blob}, \&git_to_mail,
+ my ($self, $smsg) = @_;
+ git_async_cat($self->{lei}->{ale}->git, $smsg->{blob}, \&git_to_mail,
[$self->{wcb}, $smsg]);
}
sub wq_atexit_child {
my ($self) = @_;
delete $self->{wcb};
- for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
- $git->async_wait_all;
- }
+ $self->{lei}->{ale}->git->async_wait_all;
$SIG{__WARN__} = 'DEFAULT';
}
$lei->{ovv}->ovv_atexit_child($lei);
}
-# called by LeiOverview::each_smsg_cb
-sub git { $_[0]->{git_tmp} // die 'BUG: caller did not set {git_tmp}' }
-
-sub git_tmp ($) {
- my ($self) = @_;
- my (%seen, @dirs);
- my $tmp = File::Temp->newdir("lei_xsearch_git.$$-XXXX", TMPDIR => 1);
- for my $ibxish (locals($self)) {
- my $d = File::Spec->canonpath($ibxish->git->{git_dir});
- $seen{$d} //= push @dirs, "$d/objects\n"
- }
- my $git_dir = $tmp->dirname;
- PublicInbox::Import::init_bare($git_dir);
- my $f = "$git_dir/objects/info/alternates";
- open my $alt, '>', $f or die "open($f): $!";
- print $alt @dirs or die "print $f: $!";
- close $alt or die "close $f: $!";
- my $git = PublicInbox::Git->new($git_dir);
- $git->{-tmp} = $tmp;
- $git;
-}
+sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
sub xsearch_done_wait { # dwaitpid callback
my ($arg, $pid) = @_;
# 1031: F_SETPIPE_SZ
fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
}
- if (!$lei->{opt}->{threads} && locals($self)) { # for query_mset
- # lei->{git_tmp} is set for wq_wait_old so we don't
- # delete until all lei2mail + lei_xsearch workers are reaped
- $lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
- }
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
my $op = delete $lei->{pkt_op_c};
my $lock_path = $self->{lock_path};
croak 'already locked '.($lock_path // '(undef)') if $self->{lockfh};
return unless defined($lock_path);
- sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
+ sysopen(my $lockfh, $lock_path, O_RDWR|O_CREAT) or
croak "failed to open $lock_path: $!\n";
flock($lockfh, LOCK_EX) or croak "lock $lock_path failed: $!\n";
$self->{lockfh} = $lockfh;
is(scalar(@s), 2, "2 results in mbox$sfx");
lei_ok('q', '-a', '-o', "mboxcl2:$f", 's:nonexistent');
- is(grep(!/^#/, $lei_err), 0, "no errors on no results ($sfx)");
+ is(grep(!/^#/, $lei_err), 0, "no errors on no results ($sfx)")
+ or diag $lei_err;
my @s2 = grep(/^Subject:/, $cat->());
is_deeply(\@s2, \@s,
require PublicInbox::ExtSearchIdx;
require_git 2.6;
require_ok 'PublicInbox::LeiXSearch';
+require_ok 'PublicInbox::LeiALE';
my ($home, $for_destroy) = tmpdir();
my @ibx;
for my $V (1..2) {
my $v2ibx = create_inbox 'v2full', version => 2, sub {
$_[0]->add(eml_load('t/plack-qp.eml'));
};
- my $v1ibx = create_inbox 'v1medium', indexlevel => 'medium', sub {
+ my $v1ibx = create_inbox 'v1medium', indexlevel => 'medium',
+ tmpdir => "$home/v1tmp", sub {
$_[0]->add(eml_load('t/utf8.eml'));
};
$lxs->prepare_external($v1ibx);
}
my $mset = $lxs->mset('m:testmessage@example.com');
is($mset->size, 1, 'got m: match on medium+full XSearch mix');
+ my $mitem = ($mset->items)[0];
+ my $smsg = $lxs->smsg_for($mitem) or BAIL_OUT 'smsg_for broken';
+
+ my $ale = PublicInbox::LeiALE->new("$home/ale");
+ $ale->refresh_externals($lxs);
+ my $exp = [ $smsg->{blob}, 'blob', -s 't/utf8.eml' ];
+ is_deeply([ $ale->git->check($smsg->{blob}) ], $exp, 'ale->git->check');
+
+ $lxs = PublicInbox::LeiXSearch->new;
+ $lxs->prepare_external($v2ibx);
+ $ale->refresh_externals($lxs);
+ is_deeply([ $ale->git->check($smsg->{blob}) ], $exp,
+ 'ale->git->check remembered inactive external');
+
+ rename("$home/v1tmp", "$home/v1moved") or BAIL_OUT "rename: $!";
+ $ale->refresh_externals($lxs);
+ is($ale->git->check($smsg->{blob}), undef,
+ 'missing after directory gone');
}
done_testing;