]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/SharedKV.pm
398f4ca8dd70ef70af87ecf348689b6561c1532f
[public-inbox.git] / lib / PublicInbox / SharedKV.pm
1 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # fork()-friendly key-value store.  Will be used for making
5 # augmenting Maildirs and mboxes less expensive, maybe.
6 # We use flock(2) to avoid SQLite lock problems (busy timeouts, backoff)
7 package PublicInbox::SharedKV;
8 use strict;
9 use v5.10.1;
10 use parent qw(PublicInbox::Lock);
11 use File::Temp qw(tempdir);
12 use DBI ();
13 use PublicInbox::Spawn;
14 use File::Path qw(rmtree make_path);
15
16 sub dbh {
17         my ($self, $lock) = @_;
18         $self->{dbh} //= do {
19                 my $f = $self->{filename};
20                 $lock //= $self->lock_for_scope_fast;
21                 my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', {
22                         AutoCommit => 1,
23                         RaiseError => 1,
24                         PrintError => 0,
25                         sqlite_use_immediate_transaction => 1,
26                         # no sqlite_unicode here, this is for binary data
27                 });
28                 my $opt = $self->{opt} // {};
29                 $dbh->do('PRAGMA synchronous = OFF') if !$opt->{fsync};
30                 if (my $s = $opt->{cache_size}) {
31                         $dbh->do("PRAGMA cache_size = $s");
32                 }
33                 $dbh->do('PRAGMA journal_mode = '.
34                                 ($opt->{journal_mode} // 'WAL'));
35                 $dbh->do(<<'');
36 CREATE TABLE IF NOT EXISTS kv (
37         k VARBINARY PRIMARY KEY NOT NULL,
38         v VARBINARY NOT NULL,
39         UNIQUE (k)
40 )
41
42                 $dbh;
43         }
44 }
45
46 sub new {
47         my ($cls, $dir, $base, $opt) = @_;
48         my $self = bless { opt => $opt }, $cls;
49         make_path($dir) if defined($dir) && !-d $dir;
50         $dir //= $self->{"tmp$$.$self"} = tempdir("skv.$$-XXXX", TMPDIR => 1);
51         $base //= '';
52         my $f = $self->{filename} = "$dir/$base.sqlite3";
53         $self->{lock_path} = $opt->{lock_path} // "$dir/$base.flock";
54         unless (-s $f) {
55                 PublicInbox::Spawn::nodatacow_dir($dir); # for journal/shm/wal
56                 open my $fh, '+>>', $f or die "failed to open $f: $!";
57                 PublicInbox::Spawn::nodatacow_fd(fileno($fh));
58         }
59         $self;
60 }
61
62 sub index_values {
63         my ($self) = @_;
64         my $lock = $self->lock_for_scope_fast;
65         $self->dbh($lock)->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
66 }
67
68 sub set_maybe {
69         my ($self, $key, $val, $lock) = @_;
70         $lock //= $self->lock_for_scope_fast;
71         my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
72 INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?)
73
74         $e == 0 ? undef : $e;
75 }
76
77 # caller calls sth->fetchrow_array
78 sub each_kv_iter {
79         my ($self) = @_;
80         my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
81 SELECT k,v FROM kv
82
83         $sth->execute;
84         $sth
85 }
86
87 sub keys {
88         my ($self, @pfx) = @_;
89         my $sql = 'SELECT k FROM kv';
90         if (defined $pfx[0]) {
91                 $sql .= ' WHERE k LIKE ? ESCAPE ?';
92                 my $anywhere = !!$pfx[1];
93                 $pfx[1] = '\\';
94                 $pfx[0] =~ s/([%_\\])/\\$1/g; # glob chars
95                 $pfx[0] .= '%';
96                 substr($pfx[0], 0, 0, '%') if $anywhere;
97         } else {
98                 @pfx = (); # [0] may've been undef
99         }
100         map { $_->[0] } @{$self->dbh->selectall_arrayref($sql, undef, @pfx)};
101 }
102
103 sub delete_by_val {
104         my ($self, $val, $lock) = @_;
105         $lock //= $self->lock_for_scope_fast;
106         $self->{dbh}->prepare_cached(<<'')->execute($val) + 0;
107 DELETE FROM kv WHERE v = ?
108
109 }
110
111 sub replace_values {
112         my ($self, $oldval, $newval, $lock) = @_;
113         $lock //= $self->lock_for_scope_fast;
114         $self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0;
115 UPDATE kv SET v = ? WHERE v = ?
116
117 }
118
119 sub set {
120         my ($self, $key, $val) = @_;
121         if (defined $val) {
122                 my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
123 INSERT OR REPLACE INTO kv (k,v) VALUES (?,?)
124
125                 $e == 0 ? undef : $e;
126         } else {
127                 $self->{dbh}->prepare_cached(<<'')->execute($key);
128 DELETE FROM kv WHERE k = ?
129
130         }
131 }
132
133 sub get {
134         my ($self, $key) = @_;
135         my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
136 SELECT v FROM kv WHERE k = ?
137
138         $sth->execute($key);
139         $sth->fetchrow_array;
140 }
141
142 sub xchg {
143         my ($self, $key, $newval, $lock) = @_;
144         $lock //= $self->lock_for_scope_fast;
145         my $oldval = get($self, $key);
146         if (defined $newval) {
147                 set($self, $key, $newval);
148         } else {
149                 $self->{dbh}->prepare_cached(<<'')->execute($key);
150 DELETE FROM kv WHERE k = ?
151
152         }
153         $oldval;
154 }
155
156 sub count {
157         my ($self) = @_;
158         my $sth = $self->{dbh}->prepare_cached(<<'');
159 SELECT COUNT(k) FROM kv
160
161         $sth->execute;
162         $sth->fetchrow_array;
163 }
164
165 # faster than ->count due to how SQLite works
166 sub has_entries {
167         my ($self) = @_;
168         my @n = $self->{dbh}->selectrow_array('SELECT k FROM kv LIMIT 1');
169         scalar(@n) ? 1 : undef;
170 }
171
172 sub dbh_release {
173         my ($self, $lock) = @_;
174         my $dbh = delete $self->{dbh} or return;
175         $lock //= $self->lock_for_scope_fast; # may be needed for WAL
176         %{$dbh->{CachedKids}} = (); # cleanup prepare_cached
177         $dbh->disconnect;
178 }
179
180 sub DESTROY {
181         my ($self) = @_;
182         dbh_release($self);
183         my $dir = delete $self->{"tmp$$.$self"} or return;
184         my $tries = 0;
185         do {
186                 $! = 0;
187                 eval { rmtree($dir) };
188         } while ($@ && $!{ENOENT} && $tries++ < 5);
189         warn "error removing $dir: $@" if $@;
190         warn "Took $tries tries to remove $dir\n" if $tries;
191 }
192
193 1;