sub git_to_mail { # git->cat_async callback
my ($bref, $oid, $type, $size, $arg) = @_;
- if ($type ne 'blob') {
- if ($type eq 'missing') {
- warn "missing $oid\n";
- } else {
- warn "unexpected type=$type for $oid\n";
- }
- }
+ return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob';
+ return warn("E: $oid is empty\n") unless $size;
my ($write_cb, $smsg) = @$arg;
- if ($smsg->{blob} ne $oid) {
- die "BUG: expected=$smsg->{blob} got=$oid";
- }
- $write_cb->($bref, $smsg) if $size > 0;
+ die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid;
+ $write_cb->($bref, $smsg);
}
sub reap_compress { # dwaitpid callback
$lei->fail("@$cmd failed", $? >> 8);
}
-sub _post_augment_mbox { # open a compressor process
+sub _post_augment_mbox { # open a compressor process from top-level process
my ($self, $lei) = @_;
my $zsfx = $self->{zsfx} or return;
my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei);
$lei->{1} = $pp;
}
-sub dup_src ($) {
- my ($in) = @_;
- open my $dup, '+>>&', $in or die "dup: $!";
- $dup;
-}
-
# --augment existing output destination, with deduplication
sub _augment { # MboxReader eml_cb
my ($eml, $lei) = @_;
my $lse = $lei->{lse}; # may be undef
sub { # for git_to_mail
my ($bref, $smsg, $eml) = @_;
- $mic // return $lei->fail; # dst may be undef-ed in last run
+ $mic // return $lei->fail; # mic may be undef-ed in last run
if ($dedupe) {
$eml //= PublicInbox::Eml->new($$bref); # copy bref
return if $dedupe->is_dup($eml, $smsg->{blob});
truncate($out, 0) or die "truncate($dst): $!";
return;
}
- my $zsfx = $self->{zsfx};
- my $rd = $zsfx ? PublicInbox::MboxReader::zsfxcat($out, $zsfx, $lei)
- : dup_src($out);
+ my $rd;
+ if (my $zsfx = $self->{zsfx}) {
+ $rd = PublicInbox::MboxReader::zsfxcat($out, $zsfx, $lei);
+ } else {
+ open($rd, '+>>&', $out) or die "dup: $!";
+ }
my $dedupe;
if ($opt->{augment}) {
$dedupe = $lei->{dedupe};
$m->($self, $lei, @args);
}
+# called by every single l2m worker process
sub do_post_auth {
my ($self) = @_;
my $lei = $self->{lei};
pkt_do($lei->{pkt_op_p}, 'incr_start_query') or
die "incr_start_query: $!";
my $aug;
- if (lock_free($self)) {
+ if (lock_free($self)) { # all workers do_augment
my $mod = $self->{-wq_nr_workers};
my $shard = $self->{-wq_worker_nr};
if (my $net = $lei->{net}) {
$self->{shard_info} = [ $mod, $shard ];
}
$aug = '+'; # incr_post_augment
- } elsif ($self->{-wq_worker_nr} == 0) {
+ } elsif ($self->{-wq_worker_nr} == 0) { # 1st worker do_augment
$aug = '.'; # do_post_augment
}
if ($aug) {
pkt_do($lei->{pkt_op_p}, $aug) == 1 or
die "do_post_augment trigger: $!";
}
+ # done augmenting, connect the compressor pipe for each worker
if (my $zpipe = delete $lei->{zpipe}) {
$lei->{1} = $zpipe->[1];
close $zpipe->[0];
$_[0]->{base_type} =~ /\A(?:maildir|imap|jmap)\z/ ? 1 : 0;
}
+# wakes up the MUA when complete so it can refresh messages list
sub poke_dst {
my ($self) = @_;
if ($self->{base_type} eq 'maildir') {
my ($self) = @_;
delete $self->{wcb};
$self->{lei}->{ale}->git->async_wait_all;
- $SIG{__WARN__} = 'DEFAULT';
}
# called in top-level lei-daemon when LeiAuth is done