lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiP2q.pm
+lib/PublicInbox/LeiPmdir.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiRediff.pm
lib/PublicInbox/LeiRemote.pm
@c_opt ],
'import' => [ 'LOCATION...|--stdin',
'one-time import/update from URL or filesystem',
- qw(stdin| offset=i recursive|r exclude=s include|I=s
+ qw(stdin| offset=i recursive|r exclude=s include|I=s jobs=s
lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ],
'forget-mail-sync' => [ 'LOCATION...',
'leistore.dir' => 'top-level storage location',
);
-my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers
sub _drop_wq {
my ($self) = @_;
}
sub workers_start {
- my ($lei, $wq, $jobs, $ops) = @_;
+ my ($lei, $wq, $jobs, $ops, $flds) = @_;
$ops = {
'!' => [ \&fail_handler, $lei ],
'|' => [ \&sigpipe_handler, $lei ],
$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
- $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+ $flds->{lei} = $lei;
+ $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
delete $lei->{pkt_op_p};
my $op_c = delete $lei->{pkt_op_c};
# {-lei_sock} persists script/lei process until ops->{''} EOF callback
# call this when we're ready to wait on events and yield to other clients
sub wait_wq_events {
my ($lei, $op_c, $ops) = @_;
- for my $wq (grep(defined, @$lei{qw(ikw)})) { # auxiliary WQs
+ for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
$wq->wq_close(1);
}
$op_c->{ops} = $ops;
use strict;
use v5.10.1;
use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
# /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
input_eml_cb($self, $eml, $vmd);
}
-sub input_maildir_cb { # maildir_each_eml cb
- my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+ my ($self, $f, @args) = @_;
+ my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+ die "BUG: $f was not from a Maildir?\n";
+ my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+ return if index($fl, 'T') >= 0; # no Trashed messages
+ my $kw = PublicInbox::MdirReader::flags2kw($fl);
+ substr($folder, 0, 0) = 'maildir:'; # add prefix
+ my $lms = $self->{-lms_ro};
+ my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+ my @docids = defined($oidbin) ?
+ $self->{over}->oidbin_exists($oidbin) : ();
my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
- if ($self->{-mail_sync}) {
- if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
- $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
- } else {
- warn "E: $f was not from a Maildir?\n";
- }
+ if (scalar @docids) {
+ $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+ }
+ if (my $eml = eml_from_path($f)) {
+ $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
+ $self->input_eml_cb($eml, $vmd);
}
- $self->input_eml_cb($eml, $vmd);
}
sub input_net_cb { # imap_each / nntp_each
my $vmd_mod = $self->vmd_mod_extract(\@inputs);
return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
$self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
- $self->prepare_inputs($lei, \@inputs) or return;
+ $lei->ale; # initialize for workers to read (before LeiPmdir->new)
$self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+ $self->prepare_inputs($lei, \@inputs) or return;
- $lei->ale; # initialize for workers to read
- my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+ my $j = $lei->{opt}->{jobs} // 0;
+ $j =~ /\A([0-9]+),[0-9]+\z/ and $j = $1 + 0;
+ $j ||= scalar(@{$self->{inputs}}) || 1;
my $ikw;
if (my $net = $lei->{net}) {
# $j = $net->net_concurrency($j); TODO
no warnings 'once';
no strict 'refs';
-for my $m (qw(input_maildir_cb input_net_cb)) {
+for my $m (qw(pmdir_cb input_net_cb)) {
*$m = PublicInbox::LeiImport->can($m);
}
return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
$input appears to be a maildir, not $ifmt
EOM
- PublicInbox::MdirReader->new->maildir_each_eml($input,
- $self->can('input_maildir_cb'),
- $self, @args);
+ my $mdr = PublicInbox::MdirReader->new;
+ if (my $pmd = $self->{pmd}) {
+ $mdr->maildir_each_file($input,
+ $pmd->can('each_mdir_fn'),
+ $pmd, @args);
+ } else {
+ $mdr->maildir_each_eml($input,
+ $self->can('input_maildir_cb'),
+ $self, @args);
+ }
} else {
$lei->fail("$input unsupported (TODO)");
}
push @{$sync->{no}}, '/dev/stdin' if $sync;
}
my $net = $lei->{net}; # NetWriter may be created by l2m
- my (@f, @d);
+ my (@f, @md);
# e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
for my $input (@$inputs) {
my $input_path = $input;
PublicInbox::MboxReader->reads($ifmt) or return
$lei->fail("$ifmt not supported");
} elsif (-d $input_path) {
- require PublicInbox::MdirReader;
$ifmt eq 'maildir' or return
$lei->fail("$ifmt not supported");
$sync and $input = 'maildir:'.
$lei->abs_path($input_path);
+ push @md, $input;
} else {
return $lei->fail("Unable to handle $input");
}
if ($devfd >= 0 || -f $input || -p _) {
push @{$sync->{no}}, $input if $sync;
push @f, $input;
- } elsif (-d $input) {
+ } elsif (-d "$input/new" && -d "$input/cur") {
if ($sync) {
$input = $lei->abs_path($input);
push @{$sync->{ok}}, $input;
}
- push @d, $input;
+ push @md, $input;
} else {
return $lei->fail("Unable to handle $input")
}
}
}
if (@f) { check_input_format($lei, \@f) or return }
- if (@d) { # TODO: check for MH vs Maildir, here
- require PublicInbox::MdirReader;
- }
if ($sync && $sync->{no}) {
return $lei->fail(<<"") if !$sync->{ok};
--mail-sync specified but no inputs support it
$lei->{auth} //= PublicInbox::LeiAuth->new;
$lei->{net} //= $net;
}
+ if (scalar(@md)) {
+ require PublicInbox::MdirReader;
+ if ($self->can('pmdir_cb')) {
+ require PublicInbox::LeiPmdir;
+ $self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self);
+ }
+ }
$self->{inputs} = $inputs;
}
UNIQUE (oidbin, fid, name)
)
+ # speeds up LeiImport->pmdir_cb (for "lei import") by ~6x:
+ $dbh->do(<<'');
+CREATE INDEX IF NOT EXISTS idx_fid_name ON blob2name(fid,name)
+
}
sub fid_for {
$sth->fetchrow_array;
}
+sub name_oidbin ($$$) {
+ my ($self, $mdir, $nm) = @_;
+ my $fid = $self->{fmap}->{$mdir} //= fid_for($self, $mdir) // return;
+ my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
+SELECT oidbin FROM blob2name WHERE fid = ? AND name = ?
+EOM
+ $sth->execute($fid, $nm);
+ $sth->fetchrow_array;
+}
+
sub imap_oid {
my ($self, $lei, $uid_uri) = @_;
my $mailbox_uri = $uid_uri->clone;
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with parallel Maildir reads;
+# this does NOT use the {shard_info} field of LeiToMail
+# (and we may remove {shard_info})
+# WQ key: {pmd}
+package PublicInbox::LeiPmdir;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+sub new {
+ my ($cls, $lei, $ipt) = @_;
+ my $self = bless { -wq_ident => 'lei Maildir worker' }, $cls;
+ my $jobs = $lei->{opt}->{jobs};
+ $jobs =~ /\A[0-9]+,([0-9]+)\z/ and $jobs = $1;
+ my $nproc = $jobs // do {
+ # untested with >=4 CPUs, though I suspect I/O latency
+ # of SATA SSD storage will make >=4 processes unnecessary,
+ # here. NVMe users may wish to use '-j'
+ my $n = $self->detect_nproc;
+ $n = 4 if $n > 4;
+ };
+ my ($op_c, $ops) = $lei->workers_start($self, $nproc,
+ undef, { ipt => $ipt }); # LeiInput subclass
+ $op_c->{ops} = $ops; # for PktOp->event_step
+ $lei->{pmd} = $self;
+}
+
+sub ipc_atfork_child {
+ my ($self) = @_;
+ my $lei = $self->{lei};
+ $lei->_lei_atfork_child;
+ my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
+ $ipt->{lei} = $lei;
+ $ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+ $ipt->{lse} = $ipt->{sto}->search;
+ $ipt->{over} = $ipt->{lse}->over;
+ $ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0'
+ $self->SUPER::ipc_atfork_child;
+}
+
+sub each_mdir_fn { # maildir_each_file callback
+ my ($f, $self, @args) = @_;
+ $self->wq_io_do('mdir_iter', [], $f, @args);
+}
+
+sub mdir_iter { # via wq_io_do
+ my ($self, $f, @args) = @_;
+ $self->{ipt}->pmdir_cb($f, @args);
+}
+
+sub pmd_done_wait {
+ my ($arg, $pid) = @_;
+ my ($self, $lei) = @$arg;
+ my $wait = $lei->{sto}->ipc_do('done');
+ $lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+ my ($lei) = @_;
+ my $pmd = delete $lei->{pmd} or return $lei->fail;
+ $pmd->wq_wait_old(\&pmd_done_wait, $lei);
+}
+
+1;
sub new { bless {}, __PACKAGE__ }
sub flags2kw ($) {
- my @unknown;
- my %kw;
- for (split(//, $_[0])) {
- my $k = $c2kw{$_};
- if (defined($k)) {
- $kw{$k} = 1;
- } else {
- push @unknown, $_;
+ if (wantarray) {
+ my @unknown;
+ my %kw;
+ for (split(//, $_[0])) {
+ my $k = $c2kw{$_};
+ if (defined($k)) {
+ $kw{$k} = 1;
+ } else {
+ push @unknown, $_;
+ }
}
+ (\%kw, \@unknown);
+ } else {
+ [ sort(map { $c2kw{$_} // () } split(//, $_[0])) ];
}
- (\%kw, \@unknown);
}
1;
is(scalar(keys %v), 1, 'inspect handles relative and absolute paths');
my $inspect = json_utf8->decode([ keys %v ]->[0]);
is_deeply($inspect, {"maildir:$md" => { 'name.count' => 1 }},
- 'inspect maildir: path had expected output');
+ 'inspect maildir: path had expected output') or xbail($inspect);
lei_ok(qw(q s:boolean));
my $res = json_utf8->decode($lei_out);