]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei import|lcat: improve+fix single message IMAP support
authorEric Wong <e@80x24.org>
Sat, 29 May 2021 20:20:38 +0000 (20:20 +0000)
committerEric Wong <e@80x24.org>
Sun, 30 May 2021 05:02:41 +0000 (05:02 +0000)
lcat can now dump the memoized contents of entire IMAP folders,
not just a single UID.  It's now parallelized and pipelined for
multiple lei2mail workers.

Furthemore, various forms of JSON output work consistently
with blob-only output, now.

While working on this, I noticed NetReader was passing UID URLs
to imap_each callbacks, which was causing mail_sync.sqlite3 to
store UIDs in `folders' and clearly wrong so it's now fixed.

lib/PublicInbox/LeiLcat.pm
lib/PublicInbox/LeiMailSync.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/NetReader.pm
t/lei-import-imap.t

index f9d9633acd49033f18adfb51800c67d442b0a7f8..effc36828e0000bab29d7d67a3e6d191ec1d3761 100644 (file)
@@ -11,14 +11,21 @@ use PublicInbox::LeiViewText;
 use URI::Escape qw(uri_unescape);
 use PublicInbox::MID qw($MID_EXTRACT);
 
-sub lcat_imap_uid_uri ($$) {
-       my ($lei, $uid_uri) = @_;
+sub lcat_imap_uri ($$) {
+       my ($lei, $uri) = @_;
        my $lms = $lei->{lse}->lms or return;
-       my $oidhex = $lms->imap_oid($lei, $uid_uri);
-       if (ref(my $err = $oidhex)) { # art2folder error
-               $lei->qerr(@{$err->{qerr}}) if $err->{qerr};
+       # cf. LeiToMail->wq_atexit_child
+       if (defined $uri->uid) {
+               my $oidhex = $lms->imap_oid($lei, $uri);
+               if (ref(my $err = $oidhex)) { # art2folder error
+                       $lei->qerr(@{$err->{qerr}}) if $err->{qerr};
+               }
+               push @{$lei->{lcat_blob}}, $oidhex;
+       } elsif (defined(my $fid = $lms->fid_for($$uri))) {
+               push @{$lei->{lcat_fid}}, $fid;
+       } else {
+               $lei->child_error(1 << 8, "# unknown folder: $uri");
        }
-       push @{$lei->{lcat_blob}}, $oidhex; # cf. LeiToMail->wq_atexit_child
 }
 
 sub extract_1 ($$) {
@@ -26,10 +33,8 @@ sub extract_1 ($$) {
        if ($x =~ m!\b(imaps?://[^>]+)!i) {
                my $u = $1;
                require PublicInbox::URIimap;
-               $u = PublicInbox::URIimap->new($u);
-               defined($u->uid) ? lcat_imap_uid_uri($lei, $u) :
-                               $lei->child_error(1 << 8, "# no UID= in $u");
-               '""'; # blank query, using {lcat_blob}
+               lcat_imap_uri($lei, PublicInbox::URIimap->new($u));
+               '""'; # blank query, using {lcat_blob} or {lcat
        } elsif ($x =~ m!\b([a-z]+?://\S+)!i) {
                my $u = $1;
                $u =~ s/[\>\]\)\,\.\;]+\z//;
index 5c0988b5c8d2676c605e4a5e09c32bd5af6d8f52..c7f78239e7e50c5101ed943d7be3b2f316eb8782 100644 (file)
@@ -64,9 +64,9 @@ CREATE TABLE IF NOT EXISTS blob2name (
 
 }
 
-sub _fid_for {
+sub fid_for {
        my ($self, $folder, $rw) = @_;
-       my $dbh = $self->{dbh};
+       my $dbh = $self->{dbh} //= dbh_new($self, $rw);
        my $sel = 'SELECT fid FROM folders WHERE loc = ? LIMIT 1';
        my ($fid) = $dbh->selectrow_array($sel, undef, $folder);
        return $fid if defined $fid;
@@ -111,7 +111,7 @@ EOM
 
 sub set_src {
        my ($self, $oidhex, $folder, $id) = @_;
-       my $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder, 1);
+       my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1);
        my $sth;
        if (ref($id)) { # scalar name
                $id = $$id;
@@ -128,7 +128,7 @@ INSERT OR IGNORE INTO blob2num (oidbin, fid, uid) VALUES (?, ?, ?)
 
 sub clear_src {
        my ($self, $folder, $id) = @_;
-       my $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder, 1);
+       my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1);
        my $sth;
        if (ref($id)) { # scalar name
                $id = $$id;
@@ -146,7 +146,7 @@ DELETE FROM blob2num WHERE fid = ? AND uid = ?
 # Maildir-only
 sub mv_src {
        my ($self, $folder, $oidbin, $id, $newbn) = @_;
-       my $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder, 1);
+       my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1);
        my $sth = $self->{dbh}->prepare_cached(<<'');
 UPDATE blob2name SET name = ? WHERE fid = ? AND oidbin = ? AND name = ?
 
@@ -158,7 +158,12 @@ sub each_src {
        my ($self, $folder, $cb, @args) = @_;
        my $dbh = $self->{dbh} //= dbh_new($self);
        my ($fid, $sth);
-       $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder) // return;
+       if (ref($folder) eq 'HASH') {
+               $fid = $folder->{fid} // die "BUG: no `fid'";
+       } else {
+               $fid = $self->{fmap}->{$folder} //=
+                       fid_for($self, $folder) // return;
+       }
        $sth = $dbh->prepare('SELECT oidbin,uid FROM blob2num WHERE fid = ?');
        $sth->execute($fid);
        while (my ($oidbin, $id) = $sth->fetchrow_array) {
@@ -176,7 +181,7 @@ sub location_stats {
        my $dbh = $self->{dbh} //= dbh_new($self);
        my $fid;
        my $ret = {};
-       $fid = $self->{fmap}->{$folder} //= _fid_for($self, $folder) // return;
+       $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder) // return;
        my ($row) = $dbh->selectrow_array(<<"", undef, $fid);
 SELECT COUNT(name) FROM blob2name WHERE fid = ?
 
@@ -349,7 +354,7 @@ sub forget_folder {
        my ($self, $folder) = @_;
        my ($fid, $sth);
        $fid = delete($self->{fmap}->{$folder}) //
-               _fid_for($self, $folder) // return;
+               fid_for($self, $folder) // return;
        my $dbh = $self->{dbh};
        $dbh->do('DELETE FROM blob2name WHERE fid = ?', undef, $fid);
        $dbh->do('DELETE FROM blob2num WHERE fid = ?', undef, $fid);
@@ -369,7 +374,7 @@ sub imap_oid {
                $lei->qerr(@{$err->{qerr}}) if $err->{qerr};
        }
        my $fid = $self->{fmap}->{$folders->[0]} //=
-               _fid_for($self, $folders->[0]) // return;
+               fid_for($self, $folders->[0]) // return;
        my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
 SELECT oidbin FROM blob2num WHERE fid = ? AND uid = ?
 EOM
index 078f2551cacff90287bf77c7e89c2efd6942512a..997ce5993c897c653b76426c366bee4b5d67b7f6 100644 (file)
@@ -756,12 +756,6 @@ sub write_mail { # via ->wq_io_do
 sub wq_atexit_child {
        my ($self) = @_;
        my $lei = $self->{lei};
-       if (!$self->{-wq_worker_nr} && $lei->{lcat_blob}) {
-               for my $oid (@{$lei->{lcat_blob}}) {
-                       my $smsg = { blob => $oid, pct => 100 };
-                       write_mail($self, $smsg);
-               }
-       }
        delete $self->{wcb};
        $lei->{ale}->git->async_wait_all;
        my $nr = delete($lei->{-nr_write}) or return;
index d6d42a0189dad86136bebe642b0a70e919389783..f7c1e5599e6b14b5fd84a454ee55dffc15e1f336 100644 (file)
@@ -256,6 +256,14 @@ sub query_combined_mset { # non-parallel for non-"--threads" users
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
+sub _smsg_fill ($$) {
+       my ($smsg, $eml) = @_;
+       $smsg->populate($eml);
+       $smsg->parse_references($eml, mids($eml));
+       $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
+       delete @$smsg{qw(From Subject -ds -ts)};
+}
+
 sub each_remote_eml { # callback for MboxReader->mboxrd
        my ($eml, $self, $lei, $each_smsg) = @_;
        my $xoids = $lei->{ale}->xoids_for($eml, 1);
@@ -265,10 +273,7 @@ sub each_remote_eml { # callback for MboxReader->mboxrd
        my $smsg = bless {}, 'PublicInbox::Smsg';
        $smsg->{blob} = $xoids ? (keys(%$xoids))[0]
                                : git_sha(1, $eml)->hexdigest;
-       $smsg->populate($eml);
-       $smsg->parse_references($eml, mids($eml));
-       $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
-       delete @$smsg{qw(From Subject -ds -ts)};
+       _smsg_fill($smsg, $eml);
        wait_startq($lei);
        if ($lei->{-progress}) {
                ++$lei->{-nr_remote_eml};
@@ -453,6 +458,9 @@ sub start_query ($;$) { # always runs in main (lei-daemon) process
        for my $uris (@$q) {
                $self->wq_io_do('query_remote_mboxrd', [], $uris);
        }
+       if ($self->{-do_lcat}) {
+               $self->wq_io_do('lcat_dump', []);
+       }
        $self->wq_close(1); # lei_xsearch workers stop when done
 }
 
@@ -518,6 +526,7 @@ sub do_query {
        @$end = ();
        $self->{opt_threads} = $lei->{opt}->{threads};
        $self->{opt_sort} = $lei->{opt}->{'sort'};
+       $self->{-do_lcat} = $lei->{lcat_blob} // $lei->{lcat_fid};
        if ($l2m) {
                $l2m->net_merge_all_done unless $lei->{auth};
        } else {
@@ -561,5 +570,48 @@ sub prepare_external {
        push @{$self->{locals}}, $loc;
 }
 
+sub _lcat_i { # LeiMailSync->each_src iterator callback
+       my ($oidbin, $id, $each_smsg) = @_;
+       $each_smsg->({blob => unpack('H*', $oidbin), pct => 100});
+}
+
+sub _lcat2smsg { # git->cat_async callback
+       my ($bref, $oid, $type, $size, $smsg) = @_;
+       if ($bref) {
+               my $eml = PublicInbox::Eml->new($bref);
+               my $json_dump = delete $smsg->{-json_dump};
+               bless $smsg, 'PublicInbox::Smsg';
+               _smsg_fill($smsg, $eml);
+               $json_dump->($smsg, undef, $eml);
+       }
+}
+
+sub lcat_dump {
+       my ($self) = @_;
+       my $lei = $self->{lei};
+       my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+       my $git = $lei->{ale}->git;
+       if (!$lei->{l2m}) {
+               my $json_dump = $each_smsg;
+               $each_smsg = sub {
+                       my ($smsg) = @_;
+                       use Data::Dumper;
+                       $smsg->{-json_dump} = $json_dump;
+                       $git->cat_async($smsg->{blob}, \&_lcat2smsg, $smsg);
+               };
+       }
+       for my $oid (@{$lei->{lcat_blob} // []}) {
+               $each_smsg->({ blob => $oid, pct => 100 });
+       }
+       if (my $fids = delete $lei->{lcat_fid}) {
+               my $lms = $lei->{lse}->lms;
+               for my $fid (@$fids) {
+                       $lms->each_src({fid => $fid}, \&_lcat_i, $each_smsg);
+               }
+       }
+       $git->async_wait_all;
+       undef $each_smsg; # may commit
+       $lei->{ovv}->ovv_atexit_child($lei);
+}
 
 1;
index 76d2fe62d088167a801b9ae9e7e7b01e38201ccd..54c6b082122d0896dcb8db100ca4966a0daaa632 100644 (file)
@@ -471,8 +471,10 @@ EOF
        my $uri = $orig_uri->clone;
        my $single_uid = $uri->uid;
        my ($itrk, $l_uid, $l_uidval) = itrk_last($self, $uri, $r_uidval, $mic);
-       $itrk = $l_uid = undef if defined($single_uid);
-
+       if (defined($single_uid)) {
+               $itrk = $l_uid = undef;
+               $uri->uid(undef); # for eml_cb
+       }
        return <<EOF if $l_uidval != $r_uidval;
 E: $uri UIDVALIDITY mismatch
 E: local=$l_uidval != remote=$r_uidval
index 59d481d58216499fbb8bd73f24ecc58ef9074ec9..895b19ffdd52f31155c17686fabf2ee1e9bdfd5b 100644 (file)
@@ -48,6 +48,13 @@ test_lei({ tmpdir => $tmpdir }, sub {
        ok($stats->{'uid.min'} < $stats->{'uid.max'}, 'min < max');
        ok($stats->{'uid.count'} > 0, 'count > 0');
 
+       lei_ok('lcat', $url);
+       is(scalar(grep(/^# blob:/, split(/\n/ms, $lei_out))),
+               $stats->{'uid.count'}, 'lcat on URL dumps folder');
+       lei_ok qw(lcat -f json), $url;
+       $out = json_utf8->decode($lei_out);
+       is(scalar(@$out) - 1, $stats->{'uid.count'}, 'lcat JSON dumps folder');
+
        lei_ok(qw(q z:1..));
        $out = json_utf8->decode($lei_out);
        ok(scalar(@$out) > 1, 'got imported messages');
@@ -77,6 +84,8 @@ test_lei({ tmpdir => $tmpdir }, sub {
        unlike($lei_out, qr!\Q$host_port\E!, 'sync info gone after forget');
        my $uid_url = "$url/;UID=".$stats->{'uid.max'};
        lei_ok 'import', $uid_url;
+       lei_ok 'ls-mail-sync';
+       is($lei_out, "$url\n", 'ls-mail-sync added URL w/o UID');
        lei_ok 'inspect', $uid_url;
        $lei_out =~ /([a-f0-9]{40,})/ or
                xbail 'inspect missed blob with UID URL';
@@ -88,7 +97,9 @@ test_lei({ tmpdir => $tmpdir }, sub {
        my $orig = $lei_out;
        lei_ok 'lcat', "blob:$blob";
        is($lei_out, $orig, 'lcat understands blob:...');
-       ok(!lei('lcat', $url), "lcat doesn't work on IMAP URL w/o UID");
+       lei_ok qw(lcat -f json), $uid_url;
+       $out = json_utf8->decode($lei_out);
+       is(scalar(@$out), 2, 'got JSON') or diag explain($out);
 });
 
 done_testing;