sub input_net_cb { # imap_each / nntp_each
my ($uri, $uid, $kw, $eml, $self) = @_;
- my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
- $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
if (defined $eml) {
+ my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+ $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
$self->input_eml_cb($eml, $vmd);
- } elsif ($vmd) { # old message, kw only
- my $oid = $self->{-lms_ro}->imap_oid2($uri, $uid) // return;
- my @docids = $self->{lse}->over->blob_exists($oid) or return;
- $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
- my $lei = $self->{lei};
- $lei->qerr("# $oid => @$kw\n") if $lei->{opt}->{verbose};
- $self->{lei}->{sto}->ipc_do('set_eml_vmd', undef,
- $vmd, \@docids);
+ } elsif (my $ikw = $self->{lei}->{ikw}) { # old message, kw only
+ # we send $uri as a bare SCALAR and not a URIimap ref to
+ # reduce socket traffic:
+ $ikw->wq_io_do('ck_update_kw', [], $$uri, $uid, $kw);
}
}
$lei->ale; # initialize for workers to read
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+ my $ikw;
if (my $net = $lei->{net}) {
# $j = $net->net_concurrency($j); TODO
if ($lei->{opt}->{incremental} // 1) {
$net->{incremental} = 1;
$net->{-lms_ro} = $sto->search->lms // 0;
- if ($self->{-import_kw}) {
+ if ($self->{-import_kw} && $net->{-lms_ro} &&
+ $net->{imap_order}) {
+ require PublicInbox::LeiImportKw;
+ $ikw = PublicInbox::LeiImportKw->new($lei);
$net->{each_old} = 1;
- $self->{-lms_ro} = $net->{-lms_ro};
- $self->{lse} = $sto->search;
}
}
} else {
(my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
$lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
+ $ikw->wq_close(1) if $ikw;
net_merge_all_done($self) unless $lei->{auth};
$op_c->op_wait_event($ops);
}