X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiStore.pm;h=fce15a724b511d15c49a3a2c196eb4d9e68291d7;hb=refs%2Fheads%2Fmaster;hp=821045701dfe53398ad89ebddeb7853e54a4a299;hpb=a1733d3406dfbde52d1468e671edd1d76893f546;p=public-inbox.git diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 82104570..fce15a72 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # # Local storage (cache/memo) for lei(1), suitable for personal/private @@ -32,6 +32,7 @@ use POSIX (); use IO::Handle (); # ->autoflush use Sys::Syslog qw(syslog openlog); use Errno qw(EEXIST ENOENT); +use PublicInbox::Syscall qw(rename_noreplace); sub new { my (undef, $dir, $opt) = @_; @@ -185,10 +186,7 @@ sub export1_kw_md ($$$$$) { my $dst = "$mdir/cur/$bn"; for my $d (@try) { my $src = "$mdir/$d/$orig"; - if (link($src, $dst)) { - if (!unlink($src) and $! != ENOENT) { - syslog('warning', "unlink($src): $!"); - } + if (rename_noreplace($src, $dst)) { # TODO: verify oidbin? $self->{lms}->mv_src("maildir:$mdir", $oidbin, \$orig, $bn); @@ -196,7 +194,7 @@ sub export1_kw_md ($$$$$) { } elsif ($! == EEXIST) { # lost race with "lei export-kw"? return; } elsif ($! != ENOENT) { - syslog('warning', "link($src -> $dst): $!"); + syslog('warning', "rename_noreplace($src -> $dst): $!"); } } for (@try) { return if -e "$mdir/$_/$orig" }; @@ -257,21 +255,16 @@ sub remove_eml_vmd { # remove just the VMD sub _lms_rw ($) { # it is important to have eidx processes open before lms my ($self) = @_; - my ($eidx, $tl) = eidx_init($self); - $self->{lms} //= do { + $self->{lms} // do { require PublicInbox::LeiMailSync; + my ($eidx, $tl) = eidx_init($self); my $f = "$self->{priv_eidx}->{topdir}/mail_sync.sqlite3"; my $lms = PublicInbox::LeiMailSync->new($f); $lms->lms_write_prepare; - $lms; + $self->{lms} = $lms; }; } -sub set_sync_info { - my ($self, $oidhex, $folder, $id) = @_; - _lms_rw($self)->set_src(pack('H*', $oidhex), $folder, $id); -} - sub _remove_if_local { # git->cat_async arg my ($bref, $oidhex, $type, $size, $self) = @_; $self->{im}->remove($bref) if $bref; @@ -342,6 +335,46 @@ sub _docids_and_maybe_kw ($$) { ($docids, [ sort keys %$kw ]); } +sub _reindex_1 { # git->cat_async callback + my ($bref, $hex, $type, $size, $smsg) = @_; + my $self = delete $smsg->{-sto}; + my ($eidx, $tl) = eidx_init($self); + $bref //= _lms_rw($self)->local_blob($hex, 1); + if ($bref) { + my $eml = PublicInbox::Eml->new($bref); + $smsg->{-merge_vmd} = 1; # preserve existing keywords + $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg); + } elsif ($type eq 'missing') { + # pre-release/buggy lei may've indexed external-only msgs, + # try to correct that, here + warn("E: missing $hex, culling (ancient lei artifact?)\n"); + $smsg->{to} = $smsg->{cc} = $smsg->{from} = ''; + $smsg->{bytes} = 0; + $eidx->{oidx}->update_blob($smsg, ''); + my $eml = PublicInbox::Eml->new("\r\n\r\n"); + $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg); + } else { + warn("E: $type $hex\n"); + } +} + +sub reindex_art { + my ($self, $art) = @_; + my ($eidx, $tl) = eidx_init($self); + my $smsg = $eidx->{oidx}->get_art($art) // return; + return if $smsg->{bytes} == 0; # external-only message + $smsg->{-sto} = $self; + $eidx->git->cat_async($smsg->{blob} // die("no blob (#$art)"), + \&_reindex_1, $smsg); +} + +sub reindex_done { + my ($self) = @_; + my ($eidx, $tl) = eidx_init($self); + $eidx->git->async_wait_all; + # ->done to be called via sto_done_request +} + sub add_eml { my ($self, $eml, $vmd, $xoids) = @_; my $im = $self->{-fake_im} // $self->importer; # may create new epoch @@ -351,7 +384,7 @@ sub add_eml { $smsg->{-eidx_git} = $eidx->git if !$self->{-fake_im}; my $im_mark = $im->add($eml, undef, $smsg); if ($vmd && $vmd->{sync_info}) { - set_sync_info($self, $smsg->{blob}, @{$vmd->{sync_info}}); + _lms_rw($self)->set_src($smsg->oidbin, @{$vmd->{sync_info}}); } unless ($im_mark) { # duplicate blob returns undef return unless wantarray; @@ -571,6 +604,11 @@ sub recv_and_run { $self->SUPER::recv_and_run(@args); } +sub _sto_atexit { # awaitpid cb + my ($pid) = @_; + warn "lei/store PID:$pid died \$?=$?\n" if $?; +} + sub write_prepare { my ($self, $lei) = @_; $lei // die 'BUG: $lei not passed'; @@ -586,8 +624,7 @@ sub write_prepare { lei => $lei, -err_wr => $w, to_close => [ $r ], - }); - $self->wq_wait_async; # outlives $lei + }, \&_sto_atexit); require PublicInbox::LeiStoreErr; PublicInbox::LeiStoreErr->new($r, $lei); }