]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/SharedKV.pm
doc: re-add missing 1.6 release notes
[public-inbox.git] / lib / PublicInbox / SharedKV.pm
1 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # fork()-friendly key-value store.  Will be used for making
5 # augmenting Maildirs and mboxes less expensive, maybe.
6 # We use flock(2) to avoid SQLite lock problems (busy timeouts, backoff)
7 package PublicInbox::SharedKV;
8 use strict;
9 use v5.10.1;
10 use parent qw(PublicInbox::Lock);
11 use File::Temp qw(tempdir);
12 use DBI ();
13 use PublicInbox::Spawn;
14 use File::Path qw(rmtree);
15
16 sub dbh {
17         my ($self, $lock) = @_;
18         $self->{dbh} //= do {
19                 my $f = $self->{filename};
20                 $lock //= $self->lock_for_scope;
21                 my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', {
22                         AutoCommit => 1,
23                         RaiseError => 1,
24                         PrintError => 0,
25                         sqlite_use_immediate_transaction => 1,
26                         # no sqlite_unicode here, this is for binary data
27                 });
28                 my $opt = $self->{opt} // {};
29                 $dbh->do('PRAGMA synchronous = OFF') if !$opt->{fsync};
30                 $dbh->do('PRAGMA cache_size = '.($opt->{cache_size} || 80000));
31                 $dbh->do('PRAGMA journal_mode = '.
32                                 ($opt->{journal_mode} // 'WAL'));
33                 $dbh->do(<<'');
34 CREATE TABLE IF NOT EXISTS kv (
35         k VARBINARY PRIMARY KEY NOT NULL,
36         v VARBINARY NOT NULL,
37         UNIQUE (k)
38 )
39
40                 $dbh;
41         }
42 }
43
44 sub new {
45         my ($cls, $dir, $base, $opt) = @_;
46         my $self = bless { opt => $opt }, $cls;
47         unless (defined $dir) {
48                 $self->{tmpdir} = $dir = tempdir('skv-XXXXXX', TMPDIR => 1);
49                 $self->{tmpid} = "$$.$self";
50         }
51         -d $dir or mkdir($dir) or die "mkdir($dir): $!";
52         $base //= '';
53         my $f = $self->{filename} = "$dir/$base.sqlite3";
54         $self->{lock_path} = $opt->{lock_path} // "$dir/$base.flock";
55         unless (-f $f) {
56                 open my $fh, '+>>', $f or die "failed to open $f: $!";
57                 PublicInbox::Spawn::nodatacow_fd(fileno($fh));
58         }
59         $self;
60 }
61
62 sub index_values {
63         my ($self) = @_;
64         my $lock = $self->lock_for_scope;
65         $self->dbh($lock)->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
66 }
67
68 sub set_maybe {
69         my ($self, $key, $val, $lock) = @_;
70         $lock //= $self->lock_for_scope;
71         my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
72 INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?)
73
74         $e == 0 ? undef : $e;
75 }
76
77 # caller calls sth->fetchrow_array
78 sub each_kv_iter {
79         my ($self) = @_;
80         my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
81 SELECT k,v FROM kv
82
83         $sth->execute;
84         $sth
85 }
86
87 sub delete_by_val {
88         my ($self, $val, $lock) = @_;
89         $lock //= $self->lock_for_scope;
90         $self->{dbh}->prepare_cached(<<'')->execute($val) + 0;
91 DELETE FROM kv WHERE v = ?
92
93 }
94
95 sub replace_values {
96         my ($self, $oldval, $newval, $lock) = @_;
97         $lock //= $self->lock_for_scope;
98         $self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0;
99 UPDATE kv SET v = ? WHERE v = ?
100
101 }
102
103 sub set {
104         my ($self, $key, $val) = @_;
105         if (defined $val) {
106                 my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
107 INSERT OR REPLACE INTO kv (k,v) VALUES (?,?)
108
109                 $e == 0 ? undef : $e;
110         } else {
111                 $self->{dbh}->prepare_cached(<<'')->execute($key);
112 DELETE FROM kv WHERE k = ?
113
114         }
115 }
116
117 sub get {
118         my ($self, $key) = @_;
119         my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
120 SELECT v FROM kv WHERE k = ?
121
122         $sth->execute($key);
123         $sth->fetchrow_array;
124 }
125
126 sub xchg {
127         my ($self, $key, $newval, $lock) = @_;
128         $lock //= $self->lock_for_scope;
129         my $oldval = get($self, $key);
130         if (defined $newval) {
131                 set($self, $key, $newval);
132         } else {
133                 $self->{dbh}->prepare_cached(<<'')->execute($key);
134 DELETE FROM kv WHERE k = ?
135
136         }
137         $oldval;
138 }
139
140 sub count {
141         my ($self) = @_;
142         my $sth = $self->{dbh}->prepare_cached(<<'');
143 SELECT COUNT(k) FROM kv
144
145         $sth->execute;
146         $sth->fetchrow_array;
147 }
148
149 sub DESTROY {
150         my ($self) = @_;
151         rmtree($self->{tmpdir}) if ($self->{tmpid} // '') eq "$$.$self";
152 }
153
154 1;