sub fail ($$;$) {
my ($self, $buf, $exit_code) = @_;
- err($self, $buf);
+ err($self, $buf) if defined $buf;
+ syswrite($self->{op_pipe}, '!') if $self->{op_pipe}; # fail_handler
x_it($self, ($exit_code // 1) << 8);
undef;
}
}
}
-# triggers sigpipe_handler
-sub note_sigpipe {
+sub note_sigpipe { # triggers sigpipe_handler
my ($self, $fd) = @_;
close(delete($self->{$fd})); # explicit close silences Perl warning
- syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
+ syswrite($self->{op_pipe}, '|') if $self->{op_pipe};
x_it($self, 13);
}
my $kw = $smsg->{kw} // [];
my $sfx = join('', sort(map { $kw2char{$_} // () } @$kw));
my $rand = ''; # chosen by die roll :P
- my ($tmp, $fh, $final);
+ my ($tmp, $fh, $final, $ok);
my $common = $smsg->{blob} // _rand;
if (defined(my $pct = $smsg->{pct})) { $common .= "=$pct" }
do {
$tmp = $dst.'tmp/'.$rand.$common;
- } while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY) &&
+ } while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) &&
$! == EEXIST && ($rand = _rand.','));
- if (print $fh $$buf and close($fh)) {
+ if ($ok && print $fh $$buf and close($fh)) {
# ignore new/ and write only to cur/, otherwise MUAs
# with R/W access to the Maildir will end up doing
# a mass rename which can take a while with thousands
($rand = _rand.','));
unlink($tmp) or warn "W: failed to unlink $tmp: $!\n";
} else {
- my $err = $!;
+ my $err = "Error writing $smsg->{blob} to $dst: $!\n";
+ $_[0] = undef; # clobber dst
unlink($tmp);
- die "Error writing $smsg->{blob} to $dst: $err";
+ die $err;
}
}
my $dst = $lei->{ovv}->{dst};
sub { # for git_to_mail
my ($buf, $smsg, $eml) = @_;
+ $dst // return $lei->fail; # dst may be undef-ed in last run
$buf //= \($eml->as_string);
return _buf2maildir($dst, $buf, $smsg) if !$dedupe;
$eml //= PublicInbox::Eml->new($$buf); # copy buf
syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
}
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
- my ($lei) = @_;
- my $lxs = delete $lei->{lxs};
- if ($lxs && $lxs->wq_kill_old) { # is this the daemon?
- $lxs->wq_wait_old($lei);
+sub fail_handler ($;$$) {
+ my ($lei, $code, $io) = @_;
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_wait_old($lei) if $lxs->wq_kill_old; # lei-daemon
}
- close(delete $lei->{1}) if $lei->{1};
- $lei->x_it(13);
+ close($io) if $io; # needed to avoid warnings on SIGPIPE
+ $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+ fail_handler($_[0], 13, delete $_[0]->{1});
}
sub do_query {
$lei->event_step_init; # wait for shutdowns
my $done_op = {
'' => [ \&query_done, $lei ],
- '!' => [ \&sigpipe_handler, $lei ]
+ '|' => [ \&sigpipe_handler, $lei ],
+ '!' => [ \&fail_handler, $lei ]
};
my $in_loop = exists $lei->{sock};
$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);