]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Emergency.pm
search: replace ->query with ->mset
[public-inbox.git] / lib / PublicInbox / Emergency.pm
1 # Copyright (C) 2016-2020 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 #
4 # Emergency Maildir delivery for MDA
5 package PublicInbox::Emergency;
6 use strict;
7 use warnings;
8 use Fcntl qw(:DEFAULT SEEK_SET);
9 use Sys::Hostname qw(hostname);
10 use IO::Handle; # ->flush, ->autoflush
11
12 sub new {
13         my ($class, $dir) = @_;
14
15         foreach (qw(new tmp cur)) {
16                 my $d = "$dir/$_";
17                 next if -d $d;
18                 require File::Path;
19                 if (!File::Path::mkpath($d) && !-d $d) {
20                         die "failed to mkpath($d): $!\n";
21                 }
22         }
23         bless { dir => $dir, files => {}, t => 0, cnt => 0, pid => $$ }, $class;
24 }
25
26 sub _fn_in {
27         my ($self, $dir) = @_;
28         my @host = split(/\./, hostname);
29         my $now = time;
30         if ($self->{t} != $now) {
31                 $self->{t} = $now;
32                 $self->{cnt} = 0;
33         } else {
34                 $self->{cnt}++;
35         }
36
37         my $f;
38         do {
39                 $f = "$self->{dir}/$dir/$self->{t}.$$"."_$self->{cnt}.$host[0]";
40                 $self->{cnt}++;
41         } while (-e $f);
42         $f;
43 }
44
45 sub prepare {
46         my ($self, $strref) = @_;
47
48         die "already in transaction: $self->{tmp}" if $self->{tmp};
49         my ($tmp, $fh);
50         do {
51                 $tmp = _fn_in($self, 'tmp');
52                 $! = undef;
53         } while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_RDWR) && $!{EEXIST});
54         print $fh $$strref or die "write failed: $!";
55         $fh->flush or die "flush failed: $!";
56         $fh->autoflush(1);
57         $self->{fh} = $fh;
58         $self->{tmp} = $tmp;
59 }
60
61 sub abort {
62         my ($self) = @_;
63         delete $self->{fh};
64         my $tmp = delete $self->{tmp} or return;
65
66         unlink($tmp) or warn "Failed to unlink $tmp: $!";
67         undef;
68 }
69
70 sub fh {
71         my ($self) = @_;
72         my $fh = $self->{fh} or die "{fh} not open!\n";
73         seek($fh, 0, SEEK_SET) or die "seek(fh) failed: $!";
74         sysseek($fh, 0, SEEK_SET) or die "sysseek(fh) failed: $!";
75         $fh;
76 }
77
78 sub commit {
79         my ($self) = @_;
80         $$ == $self->{pid} or return; # no-op in forked child
81
82         delete $self->{fh};
83         my $tmp = delete $self->{tmp} or return;
84         my $new;
85         do {
86                 $new = _fn_in($self, 'new');
87         } while (!link($tmp, $new) && $!{EEXIST});
88         my @sn = stat($new) or die "stat $new failed: $!";
89         my @st = stat($tmp) or die "stat $tmp failed: $!";
90         if ($st[0] == $sn[0] && $st[1] == $sn[1]) {
91                 unlink($tmp) or warn "Failed to unlink $tmp: $!";
92         } else {
93                 warn "stat($new) and stat($tmp) differ";
94         }
95 }
96
97 sub DESTROY { commit($_[0]) }
98
99 1;