-sub input_maildir_cb { # maildir_each_eml cb
- my ($f, $kw, $eml, $self) = @_;
- input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef);
-}
-
-sub input_net_cb { # imap_each, nntp_each cb
- my ($url, $uid, $kw, $eml, $self) = @_;
- input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef);
-}
-
-sub import_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($imp, $lei) = @$arg;
- $lei->child_error($?, 'non-fatal errors during import') if $?;
- my $sto = delete $lei->{sto} // return $lei->fail('BUG: {sto} gone');
- my $wait = $sto->ipc_do('done'); # PublicInbox::LeiStore::done
- $lei->dclose;
-}
-
-sub import_done { # EOF callback for main daemon
- my ($lei) = @_;
- my $imp = delete $lei->{imp} // return $lei->fail('BUG: {imp} gone');
- $imp->wq_wait_old(\&import_done_wait, $lei);
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+ my ($self, $f, $fl) = @_;
+ my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+ die "BUG: $f was not from a Maildir?\n";
+ my $kw = PublicInbox::MdirReader::flags2kw($fl);
+ substr($folder, 0, 0) = 'maildir:'; # add prefix
+ my $lse = $self->{lse} //= $self->{lei}->{sto}->search;
+ my $lms = $self->{-lms_rw} //= $self->{lei}->lms; # may be 0 or undef
+ my @oidbin = $lms ? $lms->name_oidbin($folder, $bn) : ();
+ @oidbin > 1 and warn("W: $folder/*/$$bn not unique:\n",
+ map { "\t".unpack('H*', $_)."\n" } @oidbin);
+ my %seen;
+ my @docids = sort { $a <=> $b } grep { !$seen{$_}++ }
+ map { $lse->over->oidbin_exists($_) } @oidbin;
+ my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+ if (scalar @docids) {
+ $lse->kw_changed(undef, $kw, \@docids) or return;
+ }
+ if (my $eml = eml_from_path($f)) {
+ $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
+ $self->input_eml_cb($eml, $vmd);
+ }