-sub call { # the main "lei import" method
- my ($cls, $lei, @argv) = @_;
- my $sto = $lei->_lei_store(1);
- $sto->write_prepare($lei);
- $lei->{opt}->{kw} //= 1;
- my $fmt = $lei->{opt}->{'format'};
- my $self = $lei->{imp} = bless {}, $cls;
- return $lei->fail('--format unspecified') if !$fmt;
- $self->{0} = $lei->{0} if $lei->{opt}->{stdin};
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ \&import_done, $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
- my $j = $lei->{opt}->{jobs} // scalar(@argv) || 1;
- my $nproc = $self->detect_nproc;
- $j = $nproc if $j > $nproc;
- $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
- $self->wq_io_do('import_stdin', []) if $self->{0};
- for my $x (@argv) {
- $self->wq_io_do('import_path_url', [], $x);
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+ my ($self, $f, $fl) = @_;
+ my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+ die "BUG: $f was not from a Maildir?\n";
+ my $kw = PublicInbox::MdirReader::flags2kw($fl);
+ substr($folder, 0, 0) = 'maildir:'; # add prefix
+ my $lse = $self->{lse} //= $self->{lei}->{sto}->search;
+ my $lms = $self->{-lms_rw} //= $self->{lei}->lms; # may be 0 or undef
+ my @oidbin = $lms ? $lms->name_oidbin($folder, $bn) : ();
+ @oidbin > 1 and warn("W: $folder/*/$$bn not unique:\n",
+ map { "\t".unpack('H*', $_)."\n" } @oidbin);
+ my %seen;
+ my @docids = sort { $a <=> $b } grep { !$seen{$_}++ }
+ map { $lse->over->oidbin_exists($_) } @oidbin;
+ my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+ if (scalar @docids) {
+ $lse->kw_changed(undef, $kw, \@docids) or return;