]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/SharedKV.pm
sharedkv: avoid ambiguity for numeric-like string keys
[public-inbox.git] / lib / PublicInbox / SharedKV.pm
1 # Copyright (C) all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # fork()-friendly key-value store.  Will be used for making
5 # augmenting Maildirs and mboxes less expensive, maybe.
6 # We use flock(2) to avoid SQLite lock problems (busy timeouts, backoff)
7 package PublicInbox::SharedKV;
8 use strict;
9 use v5.10.1;
10 use parent qw(PublicInbox::Lock);
11 use File::Temp qw(tempdir);
12 use DBI qw(:sql_types); # SQL_BLOB
13 use PublicInbox::Spawn;
14 use File::Path qw(rmtree make_path);
15
16 sub dbh {
17         my ($self, $lock) = @_;
18         $self->{dbh} // do {
19                 my $f = $self->{filename};
20                 $lock //= $self->lock_for_scope_fast;
21                 my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', {
22                         AutoCommit => 1,
23                         RaiseError => 1,
24                         PrintError => 0,
25                         sqlite_use_immediate_transaction => 1,
26                         # no sqlite_unicode here, this is for binary data
27                 });
28                 my $opt = $self->{opt} // {};
29                 $dbh->do('PRAGMA synchronous = OFF') if !$opt->{fsync};
30                 $dbh->do('PRAGMA journal_mode = '.
31                                 ($opt->{journal_mode} // 'WAL'));
32                 $dbh->do(<<'');
33 CREATE TABLE IF NOT EXISTS kv (
34         k VARBINARY PRIMARY KEY NOT NULL,
35         v VARBINARY NOT NULL,
36         UNIQUE (k)
37 )
38
39                 $self->{dbh} = $dbh;
40         }
41 }
42
43 sub new {
44         my ($cls, $dir, $base, $opt) = @_;
45         my $self = bless { opt => $opt }, $cls;
46         make_path($dir) if defined($dir) && !-d $dir;
47         $dir //= $self->{"tmp$$.$self"} = tempdir("skv.$$-XXXX", TMPDIR => 1);
48         $base //= '';
49         my $f = $self->{filename} = "$dir/$base.sqlite3";
50         $self->{lock_path} = $opt->{lock_path} // "$dir/$base.flock";
51         unless (-s $f) {
52                 require PublicInbox::Syscall;
53                 PublicInbox::Syscall::nodatacow_dir($dir); # for journal/shm/wal
54                 open my $fh, '+>>', $f or die "failed to open $f: $!";
55         }
56         $self;
57 }
58
59 sub set_maybe {
60         my ($self, $key, $val, $lock) = @_;
61         $lock //= $self->lock_for_scope_fast;
62         my $sth = $self->{dbh}->prepare_cached(<<'');
63 INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?)
64
65         $sth->bind_param(1, $key, SQL_BLOB);
66         $sth->bind_param(2, $val, SQL_BLOB);
67         my $e = $sth->execute;
68         $e == 0 ? undef : $e;
69 }
70
71 # caller calls sth->fetchrow_array
72 sub each_kv_iter {
73         my ($self) = @_;
74         my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
75 SELECT k,v FROM kv
76
77         $sth->execute;
78         $sth
79 }
80
81 sub keys {
82         my ($self, @pfx) = @_;
83         my $sql = 'SELECT k FROM kv';
84         if (defined $pfx[0]) {
85                 $sql .= ' WHERE k LIKE ? ESCAPE ?';
86                 my $anywhere = !!$pfx[1];
87                 $pfx[1] = '\\';
88                 $pfx[0] =~ s/([%_\\])/\\$1/g; # glob chars
89                 $pfx[0] .= '%';
90                 substr($pfx[0], 0, 0, '%') if $anywhere;
91         } else {
92                 @pfx = (); # [0] may've been undef
93         }
94         my $sth = $self->dbh->prepare($sql);
95         if (@pfx) {
96                 $sth->bind_param(1, $pfx[0], SQL_BLOB);
97                 $sth->bind_param(2, $pfx[1]);
98         }
99         $sth->execute;
100         map { $_->[0] } @{$sth->fetchall_arrayref};
101 }
102
103 sub set {
104         my ($self, $key, $val) = @_;
105         if (defined $val) {
106                 my $sth = $self->{dbh}->prepare_cached(<<'');
107 INSERT OR REPLACE INTO kv (k,v) VALUES (?,?)
108
109                 $sth->bind_param(1, $key, SQL_BLOB);
110                 $sth->bind_param(2, $val, SQL_BLOB);
111                 my $e = $sth->execute;
112                 $e == 0 ? undef : $e;
113         } else {
114                 my $sth = $self->{dbh}->prepare_cached(<<'');
115 DELETE FROM kv WHERE k = ?
116
117                 $sth->bind_param(1, $key, SQL_BLOB);
118         }
119 }
120
121 sub get {
122         my ($self, $key) = @_;
123         my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
124 SELECT v FROM kv WHERE k = ?
125
126         $sth->bind_param(1, $key, SQL_BLOB);
127         $sth->execute;
128         $sth->fetchrow_array;
129 }
130
131 sub xchg {
132         my ($self, $key, $newval, $lock) = @_;
133         $lock //= $self->lock_for_scope_fast;
134         my $oldval = get($self, $key);
135         if (defined $newval) {
136                 set($self, $key, $newval);
137         } else {
138                 my $sth = $self->{dbh}->prepare_cached(<<'');
139 DELETE FROM kv WHERE k = ?
140
141                 $sth->bind_param(1, $key, SQL_BLOB);
142                 $sth->execute;
143         }
144         $oldval;
145 }
146
147 sub count {
148         my ($self) = @_;
149         my $sth = $self->{dbh}->prepare_cached(<<'');
150 SELECT COUNT(k) FROM kv
151
152         $sth->execute;
153         $sth->fetchrow_array;
154 }
155
156 # faster than ->count due to how SQLite works
157 sub has_entries {
158         my ($self) = @_;
159         my @n = $self->{dbh}->selectrow_array('SELECT k FROM kv LIMIT 1');
160         scalar(@n) ? 1 : undef;
161 }
162
163 sub dbh_release {
164         my ($self, $lock) = @_;
165         my $dbh = delete $self->{dbh} or return;
166         $lock //= $self->lock_for_scope_fast; # may be needed for WAL
167         %{$dbh->{CachedKids}} = (); # cleanup prepare_cached
168         $dbh->disconnect;
169 }
170
171 sub DESTROY {
172         my ($self) = @_;
173         dbh_release($self);
174         my $dir = delete $self->{"tmp$$.$self"} or return;
175         my $tries = 0;
176         do {
177                 $! = 0;
178                 eval { rmtree($dir) };
179         } while ($@ && $!{ENOENT} && $tries++ < 5);
180         warn "error removing $dir: $@" if $@;
181         warn "Took $tries tries to remove $dir\n" if $tries;
182 }
183
184 1;