X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiStore.pm;h=fce15a724b511d15c49a3a2c196eb4d9e68291d7;hb=refs%2Fheads%2Fmaster;hp=821045701dfe53398ad89ebddeb7853e54a4a299;hpb=a1733d3406dfbde52d1468e671edd1d76893f546;p=public-inbox.git
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 82104570..fce15a72 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 all contributors
+# Copyright (C) all contributors
# License: AGPL-3.0+
#
# Local storage (cache/memo) for lei(1), suitable for personal/private
@@ -32,6 +32,7 @@ use POSIX ();
use IO::Handle (); # ->autoflush
use Sys::Syslog qw(syslog openlog);
use Errno qw(EEXIST ENOENT);
+use PublicInbox::Syscall qw(rename_noreplace);
sub new {
my (undef, $dir, $opt) = @_;
@@ -185,10 +186,7 @@ sub export1_kw_md ($$$$$) {
my $dst = "$mdir/cur/$bn";
for my $d (@try) {
my $src = "$mdir/$d/$orig";
- if (link($src, $dst)) {
- if (!unlink($src) and $! != ENOENT) {
- syslog('warning', "unlink($src): $!");
- }
+ if (rename_noreplace($src, $dst)) {
# TODO: verify oidbin?
$self->{lms}->mv_src("maildir:$mdir",
$oidbin, \$orig, $bn);
@@ -196,7 +194,7 @@ sub export1_kw_md ($$$$$) {
} elsif ($! == EEXIST) { # lost race with "lei export-kw"?
return;
} elsif ($! != ENOENT) {
- syslog('warning', "link($src -> $dst): $!");
+ syslog('warning', "rename_noreplace($src -> $dst): $!");
}
}
for (@try) { return if -e "$mdir/$_/$orig" };
@@ -257,21 +255,16 @@ sub remove_eml_vmd { # remove just the VMD
sub _lms_rw ($) { # it is important to have eidx processes open before lms
my ($self) = @_;
- my ($eidx, $tl) = eidx_init($self);
- $self->{lms} //= do {
+ $self->{lms} // do {
require PublicInbox::LeiMailSync;
+ my ($eidx, $tl) = eidx_init($self);
my $f = "$self->{priv_eidx}->{topdir}/mail_sync.sqlite3";
my $lms = PublicInbox::LeiMailSync->new($f);
$lms->lms_write_prepare;
- $lms;
+ $self->{lms} = $lms;
};
}
-sub set_sync_info {
- my ($self, $oidhex, $folder, $id) = @_;
- _lms_rw($self)->set_src(pack('H*', $oidhex), $folder, $id);
-}
-
sub _remove_if_local { # git->cat_async arg
my ($bref, $oidhex, $type, $size, $self) = @_;
$self->{im}->remove($bref) if $bref;
@@ -342,6 +335,46 @@ sub _docids_and_maybe_kw ($$) {
($docids, [ sort keys %$kw ]);
}
+sub _reindex_1 { # git->cat_async callback
+ my ($bref, $hex, $type, $size, $smsg) = @_;
+ my $self = delete $smsg->{-sto};
+ my ($eidx, $tl) = eidx_init($self);
+ $bref //= _lms_rw($self)->local_blob($hex, 1);
+ if ($bref) {
+ my $eml = PublicInbox::Eml->new($bref);
+ $smsg->{-merge_vmd} = 1; # preserve existing keywords
+ $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg);
+ } elsif ($type eq 'missing') {
+ # pre-release/buggy lei may've indexed external-only msgs,
+ # try to correct that, here
+ warn("E: missing $hex, culling (ancient lei artifact?)\n");
+ $smsg->{to} = $smsg->{cc} = $smsg->{from} = '';
+ $smsg->{bytes} = 0;
+ $eidx->{oidx}->update_blob($smsg, '');
+ my $eml = PublicInbox::Eml->new("\r\n\r\n");
+ $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg);
+ } else {
+ warn("E: $type $hex\n");
+ }
+}
+
+sub reindex_art {
+ my ($self, $art) = @_;
+ my ($eidx, $tl) = eidx_init($self);
+ my $smsg = $eidx->{oidx}->get_art($art) // return;
+ return if $smsg->{bytes} == 0; # external-only message
+ $smsg->{-sto} = $self;
+ $eidx->git->cat_async($smsg->{blob} // die("no blob (#$art)"),
+ \&_reindex_1, $smsg);
+}
+
+sub reindex_done {
+ my ($self) = @_;
+ my ($eidx, $tl) = eidx_init($self);
+ $eidx->git->async_wait_all;
+ # ->done to be called via sto_done_request
+}
+
sub add_eml {
my ($self, $eml, $vmd, $xoids) = @_;
my $im = $self->{-fake_im} // $self->importer; # may create new epoch
@@ -351,7 +384,7 @@ sub add_eml {
$smsg->{-eidx_git} = $eidx->git if !$self->{-fake_im};
my $im_mark = $im->add($eml, undef, $smsg);
if ($vmd && $vmd->{sync_info}) {
- set_sync_info($self, $smsg->{blob}, @{$vmd->{sync_info}});
+ _lms_rw($self)->set_src($smsg->oidbin, @{$vmd->{sync_info}});
}
unless ($im_mark) { # duplicate blob returns undef
return unless wantarray;
@@ -571,6 +604,11 @@ sub recv_and_run {
$self->SUPER::recv_and_run(@args);
}
+sub _sto_atexit { # awaitpid cb
+ my ($pid) = @_;
+ warn "lei/store PID:$pid died \$?=$?\n" if $?;
+}
+
sub write_prepare {
my ($self, $lei) = @_;
$lei // die 'BUG: $lei not passed';
@@ -586,8 +624,7 @@ sub write_prepare {
lei => $lei,
-err_wr => $w,
to_close => [ $r ],
- });
- $self->wq_wait_async; # outlives $lei
+ }, \&_sto_atexit);
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
}