PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self);
}
+sub lock_acquire_fast {
+ $_[0]->{lockfh} or return lock_acquire($_[0]);
+ flock($_[0]->{lockfh}, LOCK_EX) or croak "lock (fast) failed: $!";
+}
+
+sub lock_release_fast {
+ flock($_[0]->{lockfh} // return, LOCK_UN) or
+ croak "unlock (fast) $_[0]->{lock_path}: $!";
+}
+
+# caller must use return value
+sub lock_for_scope_fast {
+ my ($self, @single_pid) = @_;
+ lock_acquire_fast($self) or return; # lock_path not set
+ PublicInbox::OnDestroy->new(@single_pid, \&lock_release_fast, $self);
+}
+
sub new_tmp {
my ($cls, $ident) = @_;
my $tmp = File::Temp->new("$ident.lock-XXXXXX", TMPDIR => 1);
my ($self, $lock) = @_;
$self->{dbh} //= do {
my $f = $self->{filename};
- $lock //= $self->lock_for_scope;
+ $lock //= $self->lock_for_scope_fast;
my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', {
AutoCommit => 1,
RaiseError => 1,
sub index_values {
my ($self) = @_;
- my $lock = $self->lock_for_scope;
+ my $lock = $self->lock_for_scope_fast;
$self->dbh($lock)->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
}
sub set_maybe {
my ($self, $key, $val, $lock) = @_;
- $lock //= $self->lock_for_scope;
+ $lock //= $self->lock_for_scope_fast;
my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?)
sub delete_by_val {
my ($self, $val, $lock) = @_;
- $lock //= $self->lock_for_scope;
+ $lock //= $self->lock_for_scope_fast;
$self->{dbh}->prepare_cached(<<'')->execute($val) + 0;
DELETE FROM kv WHERE v = ?
sub replace_values {
my ($self, $oldval, $newval, $lock) = @_;
- $lock //= $self->lock_for_scope;
+ $lock //= $self->lock_for_scope_fast;
$self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0;
UPDATE kv SET v = ? WHERE v = ?
sub xchg {
my ($self, $key, $newval, $lock) = @_;
- $lock //= $self->lock_for_scope;
+ $lock //= $self->lock_for_scope_fast;
my $oldval = get($self, $key);
if (defined $newval) {
set($self, $key, $newval);
sub dbh_release {
my ($self, $lock) = @_;
my $dbh = delete $self->{dbh} or return;
- $lock //= $self->lock_for_scope; # may be needed for WAL
+ $lock //= $self->lock_for_scope_fast; # may be needed for WAL
%{$dbh->{CachedKids}} = (); # cleanup prepare_cached
$dbh->disconnect;
}
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use Benchmark qw(:all);
+use PublicInbox::TestCommon;
+require_ok 'PublicInbox::SharedKV';
+my ($tmpdir, $for_destroy) = tmpdir();
+local $ENV{TMPDIR} = $tmpdir;
+my $skv = PublicInbox::SharedKV->new;
+my $ipc = bless {}, 'StressSharedKV';
+$ipc->wq_workers_start('stress-sharedkv', $ENV{TEST_NPROC}//4);
+my $nr = $ENV{TEST_STRESS_NR} // 100_000;
+my $ios = [];
+my $t = timeit(1, sub {
+ for my $i (1..$nr) {
+ $ipc->wq_do('test_set_maybe', $ios, $skv, $i);
+ $ipc->wq_do('test_set_maybe', $ios, $skv, $i);
+ }
+});
+diag "$nr sets done ".timestr($t);
+
+for my $w ($ipc->wq_workers) {
+ $ipc->wq_do('test_skv_done', $ios);
+}
+diag "done requested";
+
+$ipc->wq_close;
+done_testing;
+
+package StressSharedKV;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use Digest::SHA qw(sha1);
+
+sub test_set_maybe {
+ my ($self, $skv, $i) = @_;
+ my $wcb = $self->{wcb} //= do {
+ $skv->dbh;
+ sub { $skv->set_maybe(sha1($_[0]), '') };
+ };
+ $wcb->($i + time);
+}
+
+sub test_skv_done {
+ my ($self) = @_;
+ delete $self->{wcb};
+}