]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: propagate keyword changes from lei/store
authorEric Wong <e@80x24.org>
Thu, 2 Sep 2021 10:17:58 +0000 (10:17 +0000)
committerEric Wong <e@80x24.org>
Thu, 2 Sep 2021 21:22:50 +0000 (21:22 +0000)
This works with existing inotify/EVFILT_VNODE functionality to
propagate changes made from one Maildir to another Maildir.

I chose the lei/store worker process to handle this since
propagating changes back into lei-daemon on a massive scale
could lead to dead-locking while both processes are attempting
to write to each other.  Eliminating IPC overhead is a nice
side effect, but could hurt performance if Maildirs are slow.

The code for "lei export-kw" is significantly revamped to match
the new code used in the "lei/store" daemon.  It should be more
correct w.r.t. corner-cases and stale entries, but perhaps
better tests need to be written.

squashed:
  t/lei-auto-watch: increase delay for FreeBSD kevent

  My FreeBSD VM seems to need longer for this test than inotify
  under Linux, likely because the kevent support code needs to be
  more complicated.

MANIFEST
lib/PublicInbox/LeiExportKw.pm
lib/PublicInbox/LeiNoteEvent.pm
lib/PublicInbox/LeiStore.pm
t/lei-auto-watch.t [new file with mode: 0644]

index cf7268ed19a9783b858b2b6bda47a344950ba77c..be6ec9279e6c3549e6e140cde95ce9e8c91f80b1 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -423,6 +423,7 @@ t/init.t
 t/ipc.t
 t/iso-2202-jp.eml
 t/kqnotify.t
+t/lei-auto-watch.t
 t/lei-convert.t
 t/lei-daemon.t
 t/lei-export-kw.t
index 42a5ff2269d0a16a2875ccc0c9188c08ea4d4b8f..78c6c6f95ff36b36c44c90096c87aeace7109e79 100644 (file)
@@ -25,12 +25,11 @@ sub export_kw_md { # LeiMailSync->each_src callback
        }
        $bn .= ':2,'.
                PublicInbox::LeiToMail::kw2suffix([keys %$sto_kw], @$unknown);
+       return if $bn eq $$id;
        my $dst = "$mdir/cur/$bn";
-       my @fail;
        my $lei = $self->{lei};
        for my $d (@try) {
                my $src = "$mdir/$d/$$id";
-               next if $src eq $dst;
 
                # we use link(2) + unlink(2) since rename(2) may
                # inadvertently clobber if the "uniquefilename" part wasn't
@@ -44,20 +43,19 @@ sub export_kw_md { # LeiMailSync->each_src callback
                        $lei->{sto}->ipc_do('lms_mv_src', "maildir:$mdir",
                                                $oidbin, $id, $bn);
                        return; # success anyways if link(2) worked
-               }
-               if ($! == ENOENT && !-e $src) { # some other process moved it
-                       $lei->{sto}->ipc_do('lms_clear_src',
-                                               "maildir:$mdir", $id);
-                       next;
-               }
-               push @fail, $src if $! != EEXIST;
+               } elsif ($! == EEXIST) { # lost race with lei/store?
+                       return;
+               } elsif ($! != ENOENT) {
+                       $lei->child_error(1, "E: link($src -> $dst): $!");
+               } # else loop @try
        }
-       return unless @fail;
-       # both tries failed
        my $e = $!;
-       my $orig = '['.join('|', @fail).']';
+       # both tries failed
        my $oidhex = unpack('H*', $oidbin);
-       $lei->child_error(1, "link($orig, $dst) ($oidhex): $e");
+       my $src = "$mdir/{".join(',', @try)."}/$$id";
+       $lei->child_error(1, "link($src -> $dst) ($oidhex): $e");
+       for (@try) { return if -e "$mdir/$_/$$id" }
+       $lei->{sto}->ipc_do('lms_clear_src', "maildir:$mdir", $id);
 }
 
 sub export_kw_imap { # LeiMailSync->each_src callback
index 6a40ba3967b816d6310905d751520ded7915e180..414153468b22599ba153d24a9f23c6790fb52550 100644 (file)
@@ -36,32 +36,31 @@ sub note_event_arm_done ($) {
 }
 
 sub eml_event ($$$$) {
-       my ($self, $eml, $kw, $state) = @_;
+       my ($self, $eml, $vmd, $state) = @_;
        my $sto = $self->{lei}->{sto};
        my $lse = $self->{lse} //= $sto->search;
-       my $vmd = { kw => $kw };
        if ($state =~ /\Aimport-(?:rw|ro)\z/) {
                $sto->ipc_do('set_eml', $eml, $vmd);
        } elsif ($state =~ /\Aindex-(?:rw|ro)\z/) {
                my $xoids = $self->{lei}->ale->xoids_for($eml);
                $sto->ipc_do('index_eml_only', $eml, $vmd, $xoids);
        } elsif ($state =~ /\Atag-(?:rw|ro)\z/) {
-               my $c = $lse->kw_changed($eml, $kw, my $docids = []);
+               my $c = $lse->kw_changed($eml, $vmd->{kw}, my $docids = []);
                if (scalar @$docids) { # already in lei/store
                        $sto->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
                } elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) {
                        # it's in an external, only set kw, here
                        $sto->ipc_do('set_xvmd', $xoids, $eml, $vmd);
-               } # else { totally unknown
+               } # else { totally unknown: ignore
        } else {
                warn "unknown state: $state (in $self->{lei}->{cfg}->{'-f'})\n";
        }
 }
 
 sub maildir_event { # via wq_io_do
-       my ($self, $fn, $kw, $state) = @_;
+       my ($self, $fn, $vmd, $state) = @_;
        my $eml = PublicInbox::InboxWritable::eml_from_path($fn) // return;
-       eml_event($self, $eml, $kw, $state);
+       eml_event($self, $eml, $vmd, $state);
 }
 
 sub lei_note_event {
@@ -98,7 +97,8 @@ sub lei_note_event {
                        // return;
                return if index($fl, 'T') >= 0;
                my $kw = PublicInbox::MdirReader::flags2kw($fl);
-               $self->wq_io_do('maildir_event', [], $fn, $kw, $state);
+               my $vmd = { kw => $kw, sync_info => [ $folder, \$bn ] };
+               $self->wq_io_do('maildir_event', [], $fn, $vmd, $state);
        } # else: TODO: imap
 }
 
index 0fa2d3c08ba901e12f0900786f6bd9a3fab18e9d..a91b30f749520b9ad2eab309d00e168d79fba863 100644 (file)
@@ -25,10 +25,14 @@ use PublicInbox::MID qw(mids);
 use PublicInbox::LeiSearch;
 use PublicInbox::MDA;
 use PublicInbox::Spawn qw(spawn);
+use PublicInbox::MdirReader;
+use PublicInbox::LeiToMail;
 use List::Util qw(max);
 use File::Temp ();
 use POSIX ();
 use IO::Handle (); # ->autoflush
+use Sys::Syslog qw(syslog openlog);
+use Errno qw(EEXIST ENOENT);
 
 sub new {
        my (undef, $dir, $opt) = @_;
@@ -165,12 +169,92 @@ sub _docids_for ($$) {
        sort { $a <=> $b } values %docids;
 }
 
+# n.b. similar to LeiExportKw->export_kw_md, but this is for a single eml
+sub export1_kw_md ($$$$$) {
+       my ($self, $mdir, $bn, $oidbin, $vmdish) = @_; # vmd/vmd_mod
+       my $orig = $bn;
+       my (@try, $unkn, $kw);
+       if ($bn =~ s/:2,([a-zA-Z]*)\z//) {
+               ($kw, $unkn) = PublicInbox::MdirReader::flags2kw($1);
+               if (my $set = $vmdish->{kw}) {
+                       $kw = $set;
+               } elsif (my $add = $vmdish->{'+kw'}) {
+                       @$kw{@$add} = ();
+               } elsif (my $del = $vmdish->{-kw}) {
+                       delete @$kw{@$del};
+               } # else no changes...
+               @try = qw(cur new);
+       } else { # no keywords, yet, could be in new/
+               @try = qw(new cur);
+               $unkn = [];
+               if (my $set = $vmdish->{kw}) {
+                       $kw = $set;
+               } elsif (my $add = $vmdish->{'+kw'}) {
+                       @$kw{@$add} = (); # auto-vivify
+               } else { # ignore $vmdish->{-kw}
+                       $kw = [];
+               }
+       }
+       $kw = [ keys %$kw ] if ref($kw) eq 'HASH';
+       $bn .= ':2,'. PublicInbox::LeiToMail::kw2suffix($kw, @$unkn);
+       return if $orig eq $bn; # no change
+
+       # we use link(2) + unlink(2) since rename(2) may
+       # inadvertently clobber if the "uniquefilename" part wasn't
+       # actually unique.
+       my $dst = "$mdir/cur/$bn";
+       for my $d (@try) {
+               my $src = "$mdir/$d/$orig";
+               if (link($src, $dst)) {
+                       if (!unlink($src) and $! != ENOENT) {
+                               syslog('warning', "unlink($src): $!");
+                       }
+                       # TODO: verify oidbin?
+                       lms_mv_src($self, "maildir:$mdir",
+                                       $oidbin, \$orig, $bn);
+                       return;
+               } elsif ($! == EEXIST) { # lost race with "lei export-kw"?
+                       return;
+               } elsif ($! == ENOENT) {
+                       syslog('warning', "link($src -> $dst): $!")
+               } # else loop @try
+       }
+       my $e = $!;
+       my $src = "$mdir/{".join(',', @try)."}/$orig";
+       my $oidhex = unpack('H*', $oidbin);
+       syslog('warning', "link($src -> $dst) ($oidhex): $e");
+       for (@try) { return if -e "$mdir/$_/$orig" };
+       lms_clear_src($self, "maildir:$mdir", \$orig);
+}
+
+sub sto_export_kw ($$$) {
+       my ($self, $docid, $vmdish) = @_; # vmdish (vmd or vmd_mod)
+       my ($eidx, $tl) = eidx_init($self);
+       my $lms = _lms_rw($self) // return;
+       my $xr3 = $eidx->{oidx}->get_xref3($docid, 1);
+       for my $row (@$xr3) {
+               my (undef, undef, $oidbin) = @$row;
+               my $locs = $lms->locations_for($oidbin) // next;
+               while (my ($loc, $ids) = each %$locs) {
+                       if ($loc =~ s!\Amaildir:!!i) {
+                               for my $id (@$ids) {
+                                       export1_kw_md($self, $loc, $id,
+                                                       $oidbin, $vmdish);
+                               }
+                       }
+                       # TODO: IMAP
+               }
+       }
+}
+
+# vmd = { kw => [ qw(seen ...) ], L => [ qw(inbox ...) ] }
 sub set_eml_vmd {
        my ($self, $eml, $vmd, $docids) = @_;
        my ($eidx, $tl) = eidx_init($self);
        $docids //= [ _docids_for($self, $eml) ];
        for my $docid (@$docids) {
                $eidx->idx_shard($docid)->ipc_do('set_vmd', $docid, $vmd);
+               sto_export_kw($self, $docid, $vmd);
        }
        $docids;
 }
@@ -284,6 +368,12 @@ EOF
        $docid;
 }
 
+sub _add_vmd ($$$$) {
+       my ($self, $idx, $docid, $vmd) = @_;
+       $idx->ipc_do('add_vmd', $docid, $vmd);
+       sto_export_kw($self, $docid, $vmd);
+}
+
 sub add_eml {
        my ($self, $eml, $vmd, $xoids) = @_;
        my $im = $self->{-fake_im} // $self->importer; # may create new epoch
@@ -310,7 +400,7 @@ sub add_eml {
                        @$vivify_xvmd = sort { $a <=> $b } keys(%docids);
                }
        }
-       if (@$vivify_xvmd) {
+       if (@$vivify_xvmd) { # docids list
                $xoids //= {};
                $xoids->{$smsg->{blob}} = 1;
                for my $docid (@$vivify_xvmd) {
@@ -327,7 +417,7 @@ sub add_eml {
                        for my $oid (keys %$xoids) {
                                $oidx->add_xref3($docid, -1, $oid, '.');
                        }
-                       $idx->ipc_do('add_vmd', $docid, $vmd) if $vmd;
+                       _add_vmd($self, $idx, $docid, $vmd) if $vmd;
                }
                $vivify_xvmd;
        } elsif (my @docids = _docids_for($self, $eml)) {
@@ -337,7 +427,7 @@ sub add_eml {
                        $oidx->add_xref3($docid, -1, $smsg->{blob}, '.');
                        # add_eidx_info for List-Id
                        $idx->ipc_do('add_eidx_info', $docid, '.', $eml);
-                       $idx->ipc_do('add_vmd', $docid, $vmd) if $vmd;
+                       _add_vmd($self, $idx, $docid, $vmd) if $vmd;
                }
                \@docids;
        } else { # totally new message
@@ -347,7 +437,7 @@ sub add_eml {
                $oidx->add_xref3($smsg->{num}, -1, $smsg->{blob}, '.');
                my $idx = $eidx->idx_shard($smsg->{num});
                $idx->index_eml($eml, $smsg);
-               $idx->ipc_do('add_vmd', $smsg->{num}, $vmd) if $vmd;
+               _add_vmd($self, $idx, $smsg->{num}, $vmd) if $vmd;
                $smsg;
        }
 }
@@ -365,6 +455,7 @@ sub index_eml_only {
        set_eml($self, $eml, $vmd, $xoids);
 }
 
+# store {kw} / {L} info for a message which is only in an external
 sub _external_only ($$$) {
        my ($self, $xoids, $eml) = @_;
        my $eidx = $self->{priv_eidx};
@@ -398,6 +489,7 @@ sub update_xvmd {
                next if $seen{$docid}++;
                my $idx = $eidx->idx_shard($docid);
                $idx->ipc_do('update_vmd', $docid, $vmd_mod);
+               sto_export_kw($self, $docid, $vmd_mod);
        }
        return unless scalar(keys(%$xoids));
 
@@ -410,12 +502,14 @@ sub update_xvmd {
                        }
                        my $idx = $eidx->idx_shard($docid);
                        $idx->ipc_do('update_vmd', $docid, $vmd_mod);
+                       sto_export_kw($self, $docid, $vmd_mod);
                }
                return;
        }
        # totally unseen
        my ($smsg, $idx) = _external_only($self, $xoids, $eml);
        $idx->ipc_do('update_vmd', $smsg->{num}, $vmd_mod);
+       sto_export_kw($self, $smsg->{num}, $vmd_mod);
 }
 
 # set or update keywords for external message, called via ipc_do
@@ -433,6 +527,7 @@ sub set_xvmd {
                next if $seen{$docid}++;
                my $idx = $eidx->idx_shard($docid);
                $idx->ipc_do('set_vmd', $docid, $vmd);
+               sto_export_kw($self, $docid, $vmd);
        }
        return unless scalar(keys(%$xoids));
 
@@ -443,6 +538,7 @@ sub set_xvmd {
        # totally unseen:
        my ($smsg, $idx) = _external_only($self, $xoids, $eml);
        $idx->ipc_do('add_vmd', $smsg->{num}, $vmd);
+       sto_export_kw($self, $smsg->{num}, $vmd);
 }
 
 sub checkpoint {
@@ -497,6 +593,7 @@ sub ipc_atfork_child {
        if (my $to_close = delete($self->{to_close})) {
                close($_) for @$to_close;
        }
+       openlog('lei/store', 'pid,nowait,nofatal,ndelay', 'user');
        $self->SUPER::ipc_atfork_child;
 }
 
diff --git a/t/lei-auto-watch.t b/t/lei-auto-watch.t
new file mode 100644 (file)
index 0000000..146402a
--- /dev/null
@@ -0,0 +1,45 @@
+#!perl -w
+# Copyright all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict; use v5.10.1; use PublicInbox::TestCommon;
+use File::Basename qw(basename);
+my ($ro_home, $cfg_path) = setup_public_inboxes;
+my $tick = 2.1;
+my $have_fast_inotify = eval { require Linux::Inotify2; $tick = 0.1 } ||
+       eval { require IO::KQueue; $tick = 0.5 };
+
+$have_fast_inotify or
+       diag("$0 IO::KQueue or Linux::Inotify2 missing, test will be slow");
+
+test_lei(sub {
+       my $x = "$ENV{HOME}/x";
+       my $y = "$ENV{HOME}/y";
+       lei_ok qw(add-external), "$ro_home/t1";
+       lei_ok qw(q mid:testmessage@example.com -o), $x;
+       lei_ok qw(q mid:testmessage@example.com -o), $y;
+       my @x = glob("$x/cur/*");
+       my @y = glob("$y/cur/*");
+       scalar(@x) == 1 or xbail 'expected 1 file', \@x;
+       scalar(@y) == 1 or xbail 'expected 1 file', \@y;
+
+       my $oid = '9bf1002c49eb075df47247b74d69bcd555e23422';
+       lei_ok qw(inspect), "blob:$oid";
+       my $ins = json_utf8->decode($lei_out);
+       my $exp = { "maildir:$x" => [ map { basename($_) } @x ],
+               "maildir:$y" => [ map { basename($_) } @y ] };
+       is_deeply($ins->{'mail-sync'}, $exp, 'inspect as expected');
+       lei_ok qw(add-watch), $x;
+       my $dst = $x[0] . 'S';
+       rename($x[0], $dst) or xbail "rename($x[0], $dst): $!";
+       tick($tick); # wait for inotify or kevent
+       my @y2 = glob("$y/*/*");
+       is_deeply(\@y2, [ "$y[0]S" ], "`seen' kw propagated to `y' dir");
+       lei_ok qw(note-event done);
+       lei_ok qw(inspect), "blob:$oid";
+       $ins = json_utf8->decode($lei_out);
+       $exp = { "maildir:$x" => [ map { basename($_) } glob("$x/*/*") ],
+               "maildir:$y" => [ map { basename($_) } glob("$y/*/*") ] };
+       is_deeply($ins->{'mail-sync'}, $exp, 'mail_sync matches FS');
+});
+
+done_testing;