]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei_store: local storage for Local Email Interface
authorEric Wong <e@80x24.org>
Mon, 14 Dec 2020 11:42:40 +0000 (11:42 +0000)
committerEric Wong <e@80x24.org>
Sat, 19 Dec 2020 09:32:08 +0000 (09:32 +0000)
Still unstable, this builds off the equally unstable extindex :P

This will be used for caching/memoization of traditional mail
stores (IMAP, Maildir, etc) while providing indexing via Xapian,
along with compression, and checksumming from git.

Most notably, this adds the ability to add/remove per-message
keywords (draft, seen, flagged, answered) as described in the
JMAP specification (RFC 8621 section 4.1.1).

We'll use `.' (a single period) as an $eidx_key since it's an
invalid {inboxdir} or {newsgroup} name.

12 files changed:
MANIFEST
lib/PublicInbox/ExtSearch.pm
lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/Import.pm
lib/PublicInbox/LeiDaemon.pm
lib/PublicInbox/LeiSearch.pm [new file with mode: 0644]
lib/PublicInbox/LeiStore.pm [new file with mode: 0644]
lib/PublicInbox/OverIdx.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/SearchIdxShard.pm
lib/PublicInbox/V2Writable.pm
t/lei_store.t [new file with mode: 0644]

index 7536b7c28fa6a219488e46b2483144713a43bbb4..9eb97d14d74d769af5ae6dd6044cd04fa14756fe 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -160,6 +160,8 @@ lib/PublicInbox/InboxWritable.pm
 lib/PublicInbox/Isearch.pm
 lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LeiDaemon.pm
+lib/PublicInbox/LeiSearch.pm
+lib/PublicInbox/LeiStore.pm
 lib/PublicInbox/Linkify.pm
 lib/PublicInbox/Listener.pm
 lib/PublicInbox/Lock.pm
@@ -319,6 +321,7 @@ t/init.t
 t/iso-2202-jp.eml
 t/kqnotify.t
 t/lei.t
+t/lei_store.t
 t/linkify.t
 t/main-bin/spamc
 t/mda-mime.eml
index 2a560935e7c7a0ad27458058e65155348b99e69e..410ae95854f6a2a4d2e572c26e8aceaa19ddd55a 100644 (file)
@@ -17,13 +17,13 @@ use DBI qw(:sql_types); # SQL_BLOB
 use parent qw(PublicInbox::Search);
 
 sub new {
-       my (undef, $topdir) = @_;
+       my ($class, $topdir) = @_;
        $topdir = File::Spec->canonpath($topdir);
        bless {
                topdir => $topdir,
                # xpfx => 'ei15'
                xpfx => "$topdir/ei".PublicInbox::Search::SCHEMA_VERSION
-       }, __PACKAGE__;
+       }, $class;
 }
 
 sub misc {
index b82d05462671500f97c4a04701804004bed1370c..568960562812e56500ebaee9dc28a03447dd683c 100644 (file)
@@ -896,18 +896,31 @@ sub idx_init { # similar to V2Writable
        return if $self->{idx_shards};
 
        $self->git->cleanup;
-
+       my $mode = 0644;
        my $ALL = $self->git->{git_dir}; # ALL.git
-       PublicInbox::Import::init_bare($ALL) unless -d $ALL;
+       my $old = -d $ALL;
+       if ($opt->{-private}) { # LeiStore
+               $mode = 0600;
+               if (!$old) {
+                       umask 077; # don't bother restoring
+                       PublicInbox::Import::init_bare($ALL);
+                       $self->git->qx(qw(config core.sharedRepository 0600));
+               }
+       } else {
+               PublicInbox::Import::init_bare($ALL) unless $old;
+       }
        my $info_dir = "$ALL/objects/info";
        my $alt = "$info_dir/alternates";
-       my $mode = 0644;
        my (@old, @new, %seen); # seen: st_dev + st_ino
        if (-e $alt) {
                open(my $fh, '<', $alt) or die "open $alt: $!";
                $mode = (stat($fh))[2] & 07777;
                while (my $line = <$fh>) {
                        chomp(my $d = $line);
+
+                       # expand relative path (/local/ stuff)
+                       substr($d, 0, 3) eq '../' and
+                               $d = "$ALL/objects/$d";
                        if (my @st = stat($d)) {
                                next if $seen{"$st[0]\0$st[1]"}++;
                        } else {
@@ -917,6 +930,22 @@ sub idx_init { # similar to V2Writable
                        push @old, $line;
                }
        }
+
+       # for LeiStore, and possibly some mirror-only state
+       if (opendir(my $dh, my $local = "$self->{topdir}/local")) {
+               # highest numbered epoch first
+               for my $n (sort { $b <=> $a } map { substr($_, 0, -4) + 0 }
+                               grep(/\A[0-9]+\.git\z/, readdir($dh))) {
+                       my $d = "$local/$n.git/objects"; # absolute path
+                       if (my @st = stat($d)) {
+                               next if $seen{"$st[0]\0$st[1]"}++;
+                               # favor relative paths for rename-friendliness
+                               push @new, "../../local/$n.git/objects\n";
+                       } else {
+                               warn "W: stat($d) failed: $!\n";
+                       }
+               }
+       }
        for my $ibx (@{$self->{ibx_list}}) {
                my $line = $ibx->git->{git_dir} . "/objects\n";
                chomp(my $d = $line);
index 2cb4896af5a83a08ca45cb7340f5c1e548ed1e42..b7be4c46aab27859e101cb95c1cb4a9d3ac4cbcc 100644 (file)
@@ -405,6 +405,10 @@ sub add {
        if ($smsg) {
                $smsg->{blob} = $self->get_mark(":$blob");
                $smsg->{raw_bytes} = $n;
+               if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore
+                       return if $oidx->blob_exists($smsg->{blob});
+               }
+               # XXX do we need this? it's in git at this point
                $smsg->{-raw_email} = \$raw_email;
        }
        my $ref = $self->{ref};
index d0c53416592afdbc32ddfd866585735eedb54f42..b4b1ac598ce094b3ff7784d1574d4060d682d961 100644 (file)
@@ -42,7 +42,7 @@ our %CMD = ( # sorted in order of importance/use:
 'add-extinbox' => [ 'URL-OR-PATHNAME',
        'add/set priority of a publicinbox|extindex for extra matches',
        qw(prio=i) ],
-'ls-extinbox' => [ '[FILTER]', 'list publicinbox|extindex sources',
+'ls-extinbox' => [ '[FILTER]', 'list publicinbox|extindex locations',
        qw(format|f=s z local remote) ],
 'forget-extinbox' => [ '{URL-OR-PATHNAME|--prune}',
        'exclude further results from a publicinbox|extindex',
diff --git a/lib/PublicInbox/LeiSearch.pm b/lib/PublicInbox/LeiSearch.pm
new file mode 100644 (file)
index 0000000..9cfd6ea
--- /dev/null
@@ -0,0 +1,40 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+package PublicInbox::LeiSearch;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::ExtSearch);
+use PublicInbox::Search;
+
+sub combined_docid ($$) {
+       my ($self, $num) = @_;
+       my $nshard = ($self->{nshard} // 1);
+       ($num - 1) * $nshard  + 1;
+}
+
+sub msg_keywords {
+       my ($self, $num) = @_; # num_or_mitem
+       my $xdb = $self->xdb; # set {nshard};
+       my $docid = ref($num) ? $num->get_docid : do {
+               # get combined docid from over.num:
+               # (not generic Xapian, only works with our sharding scheme)
+               my $nshard = $self->{nshard} // 1;
+               ($num - 1) * $nshard + $num % $nshard + 1;
+       };
+       my %kw;
+       eval {
+               my $end = $xdb->termlist_end($docid);
+               my $cur = $xdb->termlist_begin($docid);
+               for (; $cur != $end; $cur++) {
+                       $cur->skip_to('K');
+                       last if $cur == $end;
+                       my $kw = $cur->get_termname;
+                       $kw =~ s/\AK//s and $kw{$kw} = undef;
+               }
+       };
+       warn "E: #$docid ($num): $@\n" if $@;
+       wantarray ? sort(keys(%kw)) : \%kw;
+}
+
+1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
new file mode 100644 (file)
index 0000000..56f668b
--- /dev/null
@@ -0,0 +1,197 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# Local storage (cache/memo) for lei(1), suitable for personal/private
+# mail iff on encrypted device/FS.  Based on v2, but only deduplicates
+# based on git OID.
+#
+# for xref3, the following are constant: $eidx_key = '.', $xnum = -1
+package PublicInbox::LeiStore;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::Lock);
+use PublicInbox::SearchIdx qw(crlf_adjust);
+use PublicInbox::ExtSearchIdx;
+use PublicInbox::Import;
+use PublicInbox::InboxWritable;
+use PublicInbox::V2Writable;
+use PublicInbox::ContentHash qw(content_hash);
+use PublicInbox::MID qw(mids);
+use PublicInbox::LeiSearch;
+
+sub new {
+       my (undef, $dir, $opt) = @_;
+       my $eidx = PublicInbox::ExtSearchIdx->new($dir, $opt);
+       bless { priv_eidx => $eidx }, __PACKAGE__;
+}
+
+sub git { $_[0]->{priv_eidx}->git } # read-only
+
+sub packing_factor { $PublicInbox::V2Writable::PACKING_FACTOR }
+
+sub rotate_bytes {
+       $_[0]->{rotate_bytes} // ((1024 * 1024 * 1024) / $_[0]->packing_factor)
+}
+
+sub git_pfx { "$_[0]->{priv_eidx}->{topdir}/local" };
+
+sub git_epoch_max  {
+       my ($self) = @_;
+       my $pfx = $self->git_pfx;
+       my $max = 0;
+       return $max unless -d $pfx ;
+       opendir my $dh, $pfx or die "opendir $pfx: $!\n";
+       while (defined(my $git_dir = readdir($dh))) {
+               $git_dir =~ m!\A([0-9]+)\.git\z! or next;
+               $max = $1 + 0 if $1 > $max;
+       }
+       $max;
+}
+
+sub importer {
+       my ($self) = @_;
+       my $max;
+       my $im = $self->{im};
+       if ($im) {
+               return $im if $im->{bytes_added} < $self->rotate_bytes;
+
+               delete $self->{im};
+               $im->done;
+               undef $im;
+               $self->checkpoint;
+               $max = $self->git_epoch_max + 1;
+       }
+       my $pfx = $self->git_pfx;
+       $max //= $self->git_epoch_max;
+       while (1) {
+               my $latest = "$pfx/$max.git";
+               my $old = -e $latest;
+               my $git = PublicInbox::Git->new($latest);
+               PublicInbox::Import::init_bare({ git => $git });
+               $git->qx(qw(config core.sharedRepository 0600)) if !$old;
+               my $packed_bytes = $git->packed_bytes;
+               my $unpacked_bytes = $packed_bytes / $self->packing_factor;
+               if ($unpacked_bytes >= $self->rotate_bytes) {
+                       $max++;
+                       next;
+               }
+               chomp(my $i = $git->qx(qw(var GIT_COMMITTER_IDENT)));
+               die "$git->{git_dir} GIT_COMMITTER_IDENT failed\n" if $?;
+               my ($n, $e) = ($i =~ /\A(.+) <([^>]+)> [0-9]+ [-\+]?[0-9]+$/g)
+                       or die "could not extract name/email from `$i'\n";
+               $self->{im} = $im = PublicInbox::Import->new($git, $n, $e);
+               $im->{bytes_added} = int($packed_bytes / $self->packing_factor);
+               $im->{lock_path} = undef;
+               $im->{path_type} = 'v2';
+               return $im;
+       }
+}
+
+sub search {
+       PublicInbox::LeiSearch->new($_[0]->{priv_eidx}->{topdir});
+}
+
+sub eidx_init {
+       my ($self) = @_;
+       my $eidx = $self->{priv_eidx};
+       $eidx->idx_init({-private => 1});
+       $eidx;
+}
+
+sub _docids_for ($$) {
+       my ($self, $eml) = @_;
+       my %docids;
+       my $chash = content_hash($eml);
+       my $eidx = eidx_init($self);
+       my $oidx = $eidx->{oidx};
+       my $im = $self->{im};
+       for my $mid (@{mids($eml)}) {
+               my ($id, $prev);
+               while (my $cur = $oidx->next_by_mid($mid, \$id, \$prev)) {
+                       my $oid = $cur->{blob};
+                       my $docid = $cur->{num};
+                       my $bref = $im ? $im->cat_blob($oid) : undef;
+                       $bref //= $eidx->git->cat_file($oid) // do {
+                               warn "W: $oid (#$docid) <$mid> not found\n";
+                               next;
+                       };
+                       local $self->{current_info} = $oid;
+                       my $x = PublicInbox::Eml->new($bref);
+                       $docids{$docid} = $docid if content_hash($x) eq $chash;
+               }
+       }
+       sort { $a <=> $b } values %docids;
+}
+
+sub set_eml_keywords {
+       my ($self, $eml, @kw) = @_;
+       my $eidx = eidx_init($self);
+       my @docids = _docids_for($self, $eml);
+       for my $docid (@docids) {
+               $eidx->idx_shard($docid)->shard_set_keywords($docid, @kw);
+       }
+       \@docids;
+}
+
+sub add_eml_keywords {
+       my ($self, $eml, @kw) = @_;
+       my $eidx = eidx_init($self);
+       my @docids = _docids_for($self, $eml);
+       for my $docid (@docids) {
+               $eidx->idx_shard($docid)->shard_add_keywords($docid, @kw);
+       }
+       \@docids;
+}
+
+sub remove_eml_keywords {
+       my ($self, $eml, @kw) = @_;
+       my $eidx = eidx_init($self);
+       my @docids = _docids_for($self, $eml);
+       for my $docid (@docids) {
+               $eidx->idx_shard($docid)->shard_remove_keywords($docid, @kw);
+       }
+       \@docids;
+}
+
+sub add_eml {
+       my ($self, $eml) = @_;
+       my $eidx = eidx_init($self);
+       my $oidx = $eidx->{oidx};
+       my $smsg = bless { -oidx => $oidx }, 'PublicInbox::Smsg';
+       my $im = $self->importer;
+       $im->add($eml, undef, $smsg) or return; # duplicate returns undef
+       my $msgref = delete $smsg->{-raw_email};
+       $smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref);
+
+       local $self->{current_info} = $smsg->{blob};
+       if (my @docids = _docids_for($self, $eml)) {
+               for my $docid (@docids) {
+                       my $idx = $eidx->idx_shard($docid);
+                       $oidx->add_xref3($docid, -1, $smsg->{blob}, '.');
+                       $idx->shard_add_eidx_info($docid, '.', $eml); # List-Id
+               }
+       } else {
+               $smsg->{num} = $oidx->adj_counter('eidx_docid', '+');
+               $oidx->add_overview($eml, $smsg);
+               $oidx->add_xref3($smsg->{num}, -1, $smsg->{blob}, '.');
+               my $idx = $eidx->idx_shard($smsg->{num});
+               $idx->index_raw($msgref, $eml, $smsg);
+       }
+       $smsg->{blob}
+}
+
+sub done {
+       my ($self) = @_;
+       my $err = '';
+       if (my $im = delete($self->{im})) {
+               eval { $im->done };
+               if ($@) {
+                       $err .= "import done: $@\n";
+                       warn $err;
+               }
+       }
+       $self->{priv_eidx}->done;
+       die $err if $err;
+}
+
+1;
index 4a39bf5346285fc9dd3c464545cd48e3b6db02c6..c8630ddb3da72a8cf490ee2b6be43c06512b6a05 100644 (file)
@@ -684,4 +684,14 @@ DELETE FROM eidxq WHERE docid = ?
 
 }
 
+sub blob_exists {
+       my ($self, $oidhex) = @_;
+       my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
+SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
+
+       $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+       $sth->execute;
+       $sth->fetchrow_array;
+}
+
 1;
index b731f6982f8bd1e6d44179d26a94f32c67726ae7..548f2114b80174cfd1569c319e4a1daa1a7bc8e8 100644 (file)
@@ -1,6 +1,6 @@
 # Copyright (C) 2015-2020 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# based on notmuch, but with no concept of folders, files or flags
+# based on notmuch, but with no concept of folders, files
 #
 # Indexes mail with Xapian and our (SQLite-based) ::Msgmap for use
 # with the web and NNTP interfaces.  This index maintains thread
@@ -371,7 +371,7 @@ sub eml2doc ($$$;$) {
        index_headers($self, $smsg);
 
        if (defined(my $eidx_key = $smsg->{eidx_key})) {
-               $doc->add_boolean_term('O'.$eidx_key);
+               $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
        }
        msg_iter($eml, \&index_xapian, [ $self, $doc ]);
        index_ids($self, $doc, $eml, $mids);
@@ -467,7 +467,7 @@ sub add_eidx_info {
        begin_txn_lazy($self);
        my $doc = _get_doc($self, $docid) or return;
        term_generator($self)->set_document($doc);
-       $doc->add_boolean_term('O'.$eidx_key);
+       $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
        index_list_id($self, $doc, $eml);
        $self->{xdb}->replace_document($docid, $doc);
 }
@@ -501,6 +501,47 @@ sub remove_eidx_info {
        $self->{xdb}->replace_document($docid, $doc);
 }
 
+sub set_keywords {
+       my ($self, $docid, @kw) = @_;
+       begin_txn_lazy($self);
+       my $doc = _get_doc($self, $docid) or return;
+       my %keep = map { $_ => 1 } @kw;
+       my %add = %keep;
+       my @rm;
+       my $end = $doc->termlist_end;
+       for (my $cur = $doc->termlist_begin; $cur != $end; $cur++) {
+               $cur->skip_to('K');
+               last if $cur == $end;
+               my $kw = $cur->get_termname;
+               $kw =~ s/\AK//s or next;
+               $keep{$kw} ? delete($add{$kw}) : push(@rm, $kw);
+       }
+       return unless (scalar(@rm) + scalar(keys %add));
+       $doc->remove_term('K'.$_) for @rm;
+       $doc->add_boolean_term('K'.$_) for (keys %add);
+       $self->{xdb}->replace_document($docid, $doc);
+}
+
+sub add_keywords {
+       my ($self, $docid, @kw) = @_;
+       begin_txn_lazy($self);
+       my $doc = _get_doc($self, $docid) or return;
+       $doc->add_boolean_term('K'.$_) for @kw;
+       $self->{xdb}->replace_document($docid, $doc);
+}
+
+sub remove_keywords {
+       my ($self, $docid, @kw) = @_;
+       begin_txn_lazy($self);
+       my $doc = _get_doc($self, $docid) or return;
+       my $replace;
+       eval {
+               $doc->remove_term('K'.$_);
+               $replace = 1
+       } for @kw;
+       $self->{xdb}->replace_document($docid, $doc) if $replace;
+}
+
 sub get_val ($$) {
        my ($doc, $col) = @_;
        sortable_unserialise($doc->get_value($col));
index 2e6547693c1cef63eb514563c4db312321699c11..87b0bad62ed2ab5ae47f138460fd9730e4e9d255 100644 (file)
@@ -89,6 +89,12 @@ sub shard_worker_loop ($$$$$) {
                        my ($len, $docid, $eidx_key) = split(/ /, $line, 3);
                        $self->remove_eidx_info($docid, $eidx_key,
                                                        eml($r, $len));
+               } elsif ($line =~ s/\A=K (\d+) //) {
+                       $self->set_keywords($1 + 0, split(/ /, $line));
+               } elsif ($line =~ s/\A-K (\d+) //) {
+                       $self->remove_keywords($1 + 0, split(/ /, $line));
+               } elsif ($line =~ s/\A\+K (\d+) //) {
+                       $self->add_keywords($1 + 0, split(/ /, $line));
                } elsif ($line =~ s/\AO ([^\n]+)//) {
                        my $over_fn = $1;
                        $over_fn =~ tr/\0/\n/;
@@ -210,6 +216,33 @@ sub shard_remove {
        }
 }
 
+sub shard_set_keywords {
+       my ($self, $docid, @kw) = @_;
+       if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
+               print $w "=K $docid @kw\n" or die "failed to write: $!";
+       } else { # same process
+               $self->set_keywords($docid, @kw);
+       }
+}
+
+sub shard_remove_keywords {
+       my ($self, $docid, @kw) = @_;
+       if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
+               print $w "-K $docid @kw\n" or die "failed to write: $!";
+       } else { # same process
+               $self->remove_keywords($docid, @kw);
+       }
+}
+
+sub shard_add_keywords {
+       my ($self, $docid, @kw) = @_;
+       if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
+               print $w "+K $docid @kw\n" or die "failed to write: $!";
+       } else { # same process
+               $self->add_keywords($docid, @kw);
+       }
+}
+
 sub shard_over_check {
        my ($self, $over) = @_;
        if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
index 3e3b275f995f44e6a1403bdc382e71cbae213451..e8a5fbd25c6d35003f84cbba674d8b11c6855ea6 100644 (file)
@@ -24,7 +24,7 @@ use File::Temp ();
 
 my $OID = qr/[a-f0-9]{40,}/;
 # an estimate of the post-packed size to the raw uncompressed size
-my $PACKING_FACTOR = 0.4;
+our $PACKING_FACTOR = 0.4;
 
 # SATA storage lags behind what CPUs are capable of, so relying on
 # nproc(1) can be misleading and having extra Xapian shards is a
diff --git a/t/lei_store.t b/t/lei_store.t
new file mode 100644 (file)
index 0000000..c18a962
--- /dev/null
@@ -0,0 +1,74 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+require_mods(qw(DBD::SQLite Search::Xapian));
+require_git 2.6;
+require_ok 'PublicInbox::LeiStore';
+require_ok 'PublicInbox::ExtSearch';
+my ($home, $for_destroy) = tmpdir();
+my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
+my $store_dir = "$home/lst";
+my $lst = PublicInbox::LeiStore->new($store_dir, { creat => 1 });
+ok($lst, '->new');
+my $oid = $lst->add_eml(eml_load('t/data/0001.patch'));
+like($oid, qr/\A[0-9a-f]+\z/, 'add returned OID');
+my $eml = eml_load('t/data/0001.patch');
+is($lst->add_eml($eml), undef, 'idempotent');
+$lst->done;
+{
+       my $es = $lst->search;
+       my $msgs = $es->over->query_xover(0, 1000);
+       is(scalar(@$msgs), 1, 'one message');
+       is($msgs->[0]->{blob}, $oid, 'blob matches');
+       my $mset = $es->mset("mid:$msgs->[0]->{mid}");
+       is($mset->size, 1, 'search works');
+       is_deeply($es->mset_to_artnums($mset), [ $msgs->[0]->{num} ],
+               'mset_to_artnums');
+       my @kw = $es->msg_keywords(($mset->items)[0]);
+       is_deeply(\@kw, [], 'no flags');
+}
+
+for my $parallel (0, 1) {
+       $lst->{priv_eidx}->{parallel} = $parallel;
+       my $docids = $lst->set_eml_keywords($eml, qw(seen draft));
+       is(scalar @$docids, 1, 'set keywords on one doc');
+       $lst->done;
+       my @kw = $lst->search->msg_keywords($docids->[0]);
+       is_deeply(\@kw, [qw(draft seen)], 'kw matches');
+
+       $docids = $lst->add_eml_keywords($eml, qw(seen draft));
+       $lst->done;
+       is(scalar @$docids, 1, 'idempotently added keywords to doc');
+       @kw = $lst->search->msg_keywords($docids->[0]);
+       is_deeply(\@kw, [qw(draft seen)], 'kw matches after noop');
+
+       $docids = $lst->remove_eml_keywords($eml, qw(seen draft));
+       is(scalar @$docids, 1, 'removed from one doc');
+       $lst->done;
+       @kw = $lst->search->msg_keywords($docids->[0]);
+       is_deeply(\@kw, [], 'kw matches after remove');
+
+       $docids = $lst->remove_eml_keywords($eml, qw(answered));
+       is(scalar @$docids, 1, 'removed from one doc (idempotently)');
+       $lst->done;
+       @kw = $lst->search->msg_keywords($docids->[0]);
+       is_deeply(\@kw, [], 'kw matches after remove (idempotent)');
+
+       $docids = $lst->add_eml_keywords($eml, qw(answered));
+       is(scalar @$docids, 1, 'added to empty doc');
+       $lst->done;
+       @kw = $lst->search->msg_keywords($docids->[0]);
+       is_deeply(\@kw, ['answered'], 'kw matches after add');
+
+       $docids = $lst->set_eml_keywords($eml);
+       is(scalar @$docids, 1, 'set to clobber');
+       $lst->done;
+       @kw = $lst->search->msg_keywords($docids->[0]);
+       is_deeply(\@kw, [], 'set clobbers all');
+}
+
+done_testing;