use PublicInbox::ProcessPipe;
use PublicInbox::Spawn qw(which spawn popen_rd);
use PublicInbox::LeiDedupe;
+use PublicInbox::OnDestroy;
use Symbol qw(gensym);
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
my $dedupe = $lei->{dedupe};
$dedupe->prepare_dedupe;
sub { # for git_to_mail
- my ($buf, $smsg) = @_;
+ my ($buf, $smsg, $eml) = @_;
return unless $out;
- my $eml = PublicInbox::Eml->new($buf);
+ $eml //= PublicInbox::Eml->new($buf);
if (!$dedupe->is_dup($eml, $smsg->{blob})) {
$buf = $eml2mbox->($eml, $smsg);
my $lk = $ovv->lock_for_scope;
# _maildir_each_file callback, \&CORE::unlink doesn't work with it
sub _unlink { unlink($_[0]) }
+sub _rand () {
+ state $seq = 0;
+ sprintf('%x,%x,%x,%x', rand(0xffffffff), time, $$, ++$seq);
+}
+
sub _buf2maildir {
my ($dst, $buf, $smsg) = @_;
my $kw = $smsg->{kw} // [];
my $sfx = join('', sort(map { $kw2char{$_} // () } @$kw));
my $rand = ''; # chosen by die roll :P
my ($tmp, $fh, $final);
- my $common = $smsg->{blob};
+ my $common = $smsg->{blob} // _rand;
if (defined(my $pct = $smsg->{pct})) { $common .= "=$pct" }
do {
$tmp = $dst.'tmp/'.$rand.$common;
} while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY) &&
- $! == EEXIST && ($rand = int(rand 0x7fffffff).','));
+ $! == EEXIST && ($rand = _rand.','));
if (print $fh $$buf and close($fh)) {
# ignore new/ and write only to cur/, otherwise MUAs
# with R/W access to the Maildir will end up doing
do {
$final = $dst.$rand.$common.':2,'.$sfx;
} while (!link($tmp, $final) && $! == EEXIST &&
- ($rand = int(rand 0x7fffffff).','));
+ ($rand = _rand.','));
unlink($tmp) or warn "W: failed to unlink $tmp: $!\n";
} else {
my $err = $!;
$dedupe->prepare_dedupe;
my $dst = $lei->{ovv}->{dst};
sub { # for git_to_mail
- my ($buf, $smsg) = @_;
+ my ($buf, $smsg, $eml) = @_;
+ $buf //= \($eml->as_string);
return _buf2maildir($dst, $buf, $smsg) if !$dedupe;
- my $eml = PublicInbox::Eml->new($$buf); # copy buf
+ $eml //= PublicInbox::Eml->new($$buf); # copy buf
return if $dedupe->is_dup($eml, $smsg->{blob});
undef $eml;
_buf2maildir($dst, $buf, $smsg);
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}
-sub DESTROY {
+# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
+# ordering is unstable at worker exit and may cause segfaults
+sub reap_gits {
my ($self) = @_;
- for my $pid_git (grep(/\A$$\0/, keys %$self)) {
- $self->{$pid_git}->async_wait_all;
+ delete $self->{wcb};
+ for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
+ $git->async_wait_all;
}
- $self->SUPER::DESTROY; # PublicInbox::IPC
+}
+
+sub DESTROY { delete $_[0]->{wcb} }
+
+sub ipc_atfork_child { # runs after IPC::wq_worker_loop
+ my ($self) = @_;
+ $self->SUPER::ipc_atfork_child;
+ # reap_gits needs to run before $self->DESTROY,
+ # IPC.pm will ensure that.
+ PublicInbox::OnDestroy->new($$, \&reap_gits, $self);
}
1;