]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiImport.pm
lei import: speed up kw updates for old IMAP messages
[public-inbox.git] / lib / PublicInbox / LeiImport.pm
index 860a2c98ffeb9fea600abce5f6043277698847d3..2efd4935ab7d3d63d1c36460d054513205fed4c7 100644 (file)
@@ -43,18 +43,14 @@ sub input_maildir_cb { # maildir_each_eml cb
 
 sub input_net_cb { # imap_each / nntp_each
        my ($uri, $uid, $kw, $eml, $self) = @_;
-       my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-       $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
        if (defined $eml) {
+               my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+               $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
                $self->input_eml_cb($eml, $vmd);
-       } elsif ($vmd) { # old message, kw only
-               my $oid = $self->{-lms_ro}->imap_oid2($uri, $uid) // return;
-               my @docids = $self->{lse}->over->blob_exists($oid) or return;
-               $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
-               my $lei = $self->{lei};
-               $lei->qerr("# $oid => @$kw\n") if $lei->{opt}->{verbose};
-               $self->{lei}->{sto}->ipc_do('set_eml_vmd', undef,
-                                               $vmd, \@docids);
+       } elsif (my $ikw = $self->{lei}->{ikw}) { # old message, kw only
+               # we send $uri as a bare SCALAR and not a URIimap ref to
+               # reduce socket traffic:
+               $ikw->wq_io_do('ck_update_kw', [], $$uri, $uid, $kw);
        }
 }
 
@@ -71,15 +67,17 @@ sub do_import_index ($$@) {
 
        $lei->ale; # initialize for workers to read
        my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+       my $ikw;
        if (my $net = $lei->{net}) {
                # $j = $net->net_concurrency($j); TODO
                if ($lei->{opt}->{incremental} // 1) {
                        $net->{incremental} = 1;
                        $net->{-lms_ro} = $sto->search->lms // 0;
-                       if ($self->{-import_kw}) {
+                       if ($self->{-import_kw} && $net->{-lms_ro} &&
+                                       $net->{imap_order}) {
+                               require PublicInbox::LeiImportKw;
+                               $ikw = PublicInbox::LeiImportKw->new($lei);
                                $net->{each_old} = 1;
-                               $self->{-lms_ro} = $net->{-lms_ro};
-                               $self->{lse} = $sto->search;
                        }
                }
        } else {
@@ -93,6 +91,7 @@ sub do_import_index ($$@) {
        (my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
        $lei->{wq1} = $self;
        $lei->{-err_type} = 'non-fatal';
+       $ikw->wq_close(1) if $ikw;
        net_merge_all_done($self) unless $lei->{auth};
        $op_c->op_wait_event($ops);
 }