]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/SharedKV.pm
avoid calling waitpid from children in DESTROY
[public-inbox.git] / lib / PublicInbox / SharedKV.pm
1 # Copyright (C) 2020 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # fork()-friendly key-value store.  Will be used for making
5 # augmenting Maildirs and mboxes less expensive, maybe.
6 # We use flock(2) to avoid SQLite lock problems (busy timeouts, backoff)
7 package PublicInbox::SharedKV;
8 use strict;
9 use v5.10.1;
10 use parent qw(PublicInbox::Lock);
11 use File::Temp 0.19 (); # 0.19 for ->newdir
12 use DBI ();
13 use PublicInbox::Spawn;
14
15 sub dbh {
16         my ($self, $lock) = @_;
17         $self->{dbh} //= do {
18                 my $f = $self->{filename};
19                 $lock //= $self->lock_for_scope;
20                 my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', {
21                         AutoCommit => 1,
22                         RaiseError => 1,
23                         PrintError => 0,
24                         sqlite_use_immediate_transaction => 1,
25                         # no sqlite_unicode here, this is for binary data
26                 });
27                 my $opt = $self->{opt} // {};
28                 $dbh->do('PRAGMA synchronous = OFF') if !$opt->{fsync};
29                 $dbh->do('PRAGMA cache_size = '.($opt->{cache_size} || 80000));
30                 $dbh->do('PRAGMA journal_mode = '.
31                                 ($opt->{journal_mode} // 'WAL'));
32                 $dbh->do(<<'');
33 CREATE TABLE IF NOT EXISTS kv (
34         k VARBINARY PRIMARY KEY NOT NULL,
35         v VARBINARY NOT NULL,
36         UNIQUE (k)
37 )
38
39                 $dbh;
40         }
41 }
42
43 sub new {
44         my ($cls, $dir, $base, $opt) = @_;
45         my $self = bless { opt => $opt }, $cls;
46         unless (defined $dir) {
47                 $self->{tmp} = File::Temp->newdir('kv-XXXXXX', TMPDIR => 1);
48                 $dir = $self->{tmp}->dirname;
49         }
50         -d $dir or mkdir($dir) or die "mkdir($dir): $!";
51         $base //= '';
52         my $f = $self->{filename} = "$dir/$base.sqlite3";
53         $self->{lock_path} = $opt->{lock_path} // "$dir/$base.flock";
54         unless (-f $f) {
55                 open my $fh, '+>>', $f or die "failed to open $f: $!";
56                 PublicInbox::Spawn::nodatacow_fd(fileno($fh));
57         }
58         $self;
59 }
60
61 sub index_values {
62         my ($self) = @_;
63         my $lock = $self->lock_for_scope;
64         $self->dbh($lock)->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
65 }
66
67 sub set_maybe {
68         my ($self, $key, $val, $lock) = @_;
69         $lock //= $self->lock_for_scope;
70         my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
71 INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?)
72
73         $e == 0 ? undef : $e;
74 }
75
76 # caller calls sth->fetchrow_array
77 sub each_kv_iter {
78         my ($self) = @_;
79         my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
80 SELECT k,v FROM kv
81
82         $sth->execute;
83         $sth
84 }
85
86 sub delete_by_val {
87         my ($self, $val, $lock) = @_;
88         $lock //= $self->lock_for_scope;
89         $self->{dbh}->prepare_cached(<<'')->execute($val) + 0;
90 DELETE FROM kv WHERE v = ?
91
92 }
93
94 sub replace_values {
95         my ($self, $oldval, $newval, $lock) = @_;
96         $lock //= $self->lock_for_scope;
97         $self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0;
98 UPDATE kv SET v = ? WHERE v = ?
99
100 }
101
102 sub set {
103         my ($self, $key, $val) = @_;
104         if (defined $val) {
105                 my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
106 INSERT OR REPLACE INTO kv (k,v) VALUES (?,?)
107
108                 $e == 0 ? undef : $e;
109         } else {
110                 $self->{dbh}->prepare_cached(<<'')->execute($key);
111 DELETE FROM kv WHERE k = ?
112
113         }
114 }
115
116 sub get {
117         my ($self, $key) = @_;
118         my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
119 SELECT v FROM kv WHERE k = ?
120
121         $sth->execute($key);
122         $sth->fetchrow_array;
123 }
124
125 sub xchg {
126         my ($self, $key, $newval, $lock) = @_;
127         $lock //= $self->lock_for_scope;
128         my $oldval = get($self, $key);
129         if (defined $newval) {
130                 set($self, $key, $newval);
131         } else {
132                 $self->{dbh}->prepare_cached(<<'')->execute($key);
133 DELETE FROM kv WHERE k = ?
134
135         }
136         $oldval;
137 }
138
139 sub count {
140         my ($self) = @_;
141         my $sth = $self->{dbh}->prepare_cached(<<'');
142 SELECT COUNT(k) FROM kv
143
144         $sth->execute;
145         $sth->fetchrow_array;
146 }
147
148 1;