-sub do_import {
- my ($lei) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ \&import_done, $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
- my $self = $lei->{imp};
- my $j = $lei->{opt}->{jobs} // scalar(@{$self->{argv}}) || 1;
- if (my $nrd = $lei->{nrd}) {
- # $j = $nrd->net_concurrency($j); TODO
- } else {
- my $nproc = $self->detect_nproc;
- $j = $nproc if $j > $nproc;
- }
- $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
- $self->wq_io_do('import_stdin', []) if $self->{0};
- for my $x (@{$self->{argv}}) {
- $self->wq_io_do('import_path_url', [], $x);
- }
- $self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
+sub input_mbox_cb { # MboxReader callback
+ my ($eml, $self) = @_;
+ my $vmd;
+ if ($self->{-import_kw}) {
+ my $kw = PublicInbox::MboxReader::mbox_keywords($eml);
+ $vmd = { kw => $kw } if scalar(@$kw);