]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiImport.pm
lei import: speed up repeated Maildir imports
[public-inbox.git] / lib / PublicInbox / LeiImport.pm
index 222f75c8abe1b0e5c9ebda30e68c9a3ca0f03a5e..b0e7ba6bc490ef9d870e4c55f36a98a06e5b4597 100644 (file)
@@ -6,6 +6,7 @@ package PublicInbox::LeiImport;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 # /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
 
@@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback
        input_eml_cb($self, $eml, $vmd);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-       my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+       my ($self, $f, @args) = @_;
+       my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+               die "BUG: $f was not from a Maildir?\n";
+       my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+       return if index($fl, 'T') >= 0; # no Trashed messages
+       my $kw = PublicInbox::MdirReader::flags2kw($fl);
+       substr($folder, 0, 0) = 'maildir:'; # add prefix
+       my $lms = $self->{-lms_ro};
+       my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+       my @docids = defined($oidbin) ?
+                       $self->{over}->oidbin_exists($oidbin) : ();
        my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-       if ($self->{-mail_sync}) {
-               if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
-                       $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
-               } else {
-                       warn "E: $f was not from a Maildir?\n";
-               }
+       if (scalar @docids) {
+               $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+       }
+       if (my $eml = eml_from_path($f)) {
+               $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
+               $self->input_eml_cb($eml, $vmd);
        }
-       $self->input_eml_cb($eml, $vmd);
 }
 
 sub input_net_cb { # imap_each / nntp_each
@@ -62,11 +72,13 @@ sub do_import_index ($$@) {
        my $vmd_mod = $self->vmd_mod_extract(\@inputs);
        return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
        $self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
-       $self->prepare_inputs($lei, \@inputs) or return;
+       $lei->ale; # initialize for workers to read (before LeiPmdir->new)
        $self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+       $self->prepare_inputs($lei, \@inputs) or return;
 
-       $lei->ale; # initialize for workers to read
-       my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+       my $j = $lei->{opt}->{jobs} // 0;
+       $j =~ /\A([0-9]+),[0-9]+\z/ and $j = $1 + 0;
+       $j ||= scalar(@{$self->{inputs}}) || 1;
        my $ikw;
        if (my $net = $lei->{net}) {
                # $j = $net->net_concurrency($j); TODO