use Symbol qw(gensym);
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
-use Digest::SHA qw(sha256_hex);
my %kw2char = ( # Maildir characters
draft => 'D',
sub git_to_mail { # git->cat_async callback
my ($bref, $oid, $type, $size, $arg) = @_;
- if ($type ne 'blob') {
- if ($type eq 'missing') {
- warn "missing $oid\n";
- } else {
- warn "unexpected type=$type for $oid\n";
- }
- }
+ return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob';
+ return warn("E: $oid is empty\n") unless $size;
my ($write_cb, $smsg) = @$arg;
- if ($smsg->{blob} ne $oid) {
- die "BUG: expected=$smsg->{blob} got=$oid";
- }
- $write_cb->($bref, $smsg) if $size > 0;
+ die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid;
+ $write_cb->($bref, $smsg);
}
sub reap_compress { # dwaitpid callback
$lei->fail("@$cmd failed", $? >> 8);
}
-sub _post_augment_mbox { # open a compressor process
+sub _post_augment_mbox { # open a compressor process from top-level process
my ($self, $lei) = @_;
my $zsfx = $self->{zsfx} or return;
my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei);
$lei->{1} = $pp;
}
-sub dup_src ($) {
- my ($in) = @_;
- open my $dup, '+>>&', $in or die "dup: $!";
- $dup;
-}
-
# --augment existing output destination, with deduplication
sub _augment { # MboxReader eml_cb
my ($eml, $lei) = @_;
}
}
-sub _augment_or_unlink { # maildir_each_eml cb
- my ($f, $kw, $eml, $lei, $lse, $mod, $shard, $unlink) = @_;
- if ($mod) {
- # can't get dirent.d_ino w/ pure Perl readdir, so we extract
- # the OID if it looks like one instead of doing stat(2)
- my $hex = $f =~ m!\b([a-f0-9]{40,})[^/]*\z! ?
- $1 : sha256_hex($f);
- my $recno = hex(substr($hex, 0, 8));
- return if ($recno % $mod) != $shard;
- update_kw_maybe($lei, $lse, $eml, $kw);
- }
+sub _md_update { # maildir_each_eml cb
+ my ($f, $kw, $eml, $lei, $lse, $unlink) = @_;
+ update_kw_maybe($lei, $lse, $eml, $kw);
$unlink ? unlink($f) : _augment($eml, $lei);
}
my $lse = $lei->{lse}; # may be undef
sub { # for git_to_mail
my ($bref, $smsg, $eml) = @_;
- $mic // return $lei->fail; # dst may be undef-ed in last run
+ $mic // return $lei->fail; # mic may be undef-ed in last run
if ($dedupe) {
$eml //= PublicInbox::Eml->new($$bref); # copy bref
return if $dedupe->is_dup($eml, $smsg->{blob});
my ($self, $lei) = @_;
my $dst = $lei->{ovv}->{dst};
my $lse = $lei->{opt}->{'import-before'} ? $lei->{lse} : undef;
- my ($mod, $shard) = @{$self->{shard_info} // []};
+ my $mdr = PublicInbox::MdirReader->new;
if ($lei->{opt}->{augment}) {
my $dedupe = $lei->{dedupe};
if ($dedupe && $dedupe->prepare_dedupe) {
- PublicInbox::MdirReader::maildir_each_eml($dst,
- \&_augment_or_unlink,
- $lei, $lse, $mod, $shard);
+ $mdr->{shard_info} = $self->{shard_info};
+ $mdr->maildir_each_eml($dst, \&_md_update, $lei, $lse);
$dedupe->pause_dedupe;
}
} elsif ($lse) {
- PublicInbox::MdirReader::maildir_each_eml($dst,
- \&_augment_or_unlink,
- $lei, $lse, $mod, $shard, 1);
+ $mdr->{shard_info} = $self->{shard_info};
+ $mdr->maildir_each_eml($dst, \&_md_update, $lei, $lse, 1);
} else {# clobber existing Maildir
- PublicInbox::MdirReader::maildir_each_file($dst, \&_unlink);
+ $mdr->maildir_each_file($dst, \&_unlink);
}
}
truncate($out, 0) or die "truncate($dst): $!";
return;
}
- my $zsfx = $self->{zsfx};
- my $rd = $zsfx ? PublicInbox::MboxReader::zsfxcat($out, $zsfx, $lei)
- : dup_src($out);
+ my $rd;
+ if (my $zsfx = $self->{zsfx}) {
+ $rd = PublicInbox::MboxReader::zsfxcat($out, $zsfx, $lei);
+ } else {
+ open($rd, '+>>&', $out) or die "dup: $!";
+ }
my $dedupe;
if ($opt->{augment}) {
$dedupe = $lei->{dedupe};
$m->($self, $lei, @args);
}
+# called by every single l2m worker process
sub do_post_auth {
my ($self) = @_;
my $lei = $self->{lei};
pkt_do($lei->{pkt_op_p}, 'incr_start_query') or
die "incr_start_query: $!";
my $aug;
- if (lock_free($self)) {
+ if (lock_free($self)) { # all workers do_augment
my $mod = $self->{-wq_nr_workers};
my $shard = $self->{-wq_worker_nr};
if (my $net = $lei->{net}) {
$self->{shard_info} = [ $mod, $shard ];
}
$aug = '+'; # incr_post_augment
- } elsif ($self->{-wq_worker_nr} == 0) {
+ } elsif ($self->{-wq_worker_nr} == 0) { # 1st worker do_augment
$aug = '.'; # do_post_augment
}
if ($aug) {
pkt_do($lei->{pkt_op_p}, $aug) == 1 or
die "do_post_augment trigger: $!";
}
+ # done augmenting, connect the compressor pipe for each worker
if (my $zpipe = delete $lei->{zpipe}) {
$lei->{1} = $zpipe->[1];
close $zpipe->[0];
$_[0]->{base_type} =~ /\A(?:maildir|imap|jmap)\z/ ? 1 : 0;
}
+# wakes up the MUA when complete so it can refresh messages list
sub poke_dst {
my ($self) = @_;
if ($self->{base_type} eq 'maildir') {
my ($self) = @_;
delete $self->{wcb};
$self->{lei}->{ale}->git->async_wait_all;
- $SIG{__WARN__} = 'DEFAULT';
}
# called in top-level lei-daemon when LeiAuth is done