MANIFEST | 1 +
lib/PublicInbox/Import.pm | 17 ++++-------------
lib/PublicInbox/Lock.pm | 31 +++++++++++++++++++++++++++++++
lib/PublicInbox/SearchIdx.pm | 27 +++------------------------
lib/PublicInbox/SearchIdxSkeleton.pm | 15 +++++++--------
lib/PublicInbox/V2Writable.pm | 12 ++++++------
diff --git a/MANIFEST b/MANIFEST
index a42b9e1ab02234de2ea2864b4ad80b450519ddbe..3b0b013f2cd14dc02f54ce2b22a73efa9923f760 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -68,6 +68,7 @@ lib/PublicInbox/Import.pm
lib/PublicInbox/Inbox.pm
lib/PublicInbox/Linkify.pm
lib/PublicInbox/Listener.pm
+lib/PublicInbox/Lock.pm
lib/PublicInbox/MDA.pm
lib/PublicInbox/MID.pm
lib/PublicInbox/MIME.pm
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index ca30ac444ca4ef7af11f92a602f0837708493cc5..fc740fa451c9d7ad19da99c1dc0a5490358674ae 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -7,7 +7,7 @@ # not the WWW or NNTP code which only requires read-only access.
package PublicInbox::Import;
use strict;
use warnings;
-use Fcntl qw(:flock :DEFAULT);
+use base qw(PublicInbox::Lock);
use PublicInbox::Spawn qw(spawn);
use PublicInbox::MID qw(mids mid_mime mid2path);
use PublicInbox::Address;
@@ -44,19 +44,13 @@ my ($in_r, $in_w, $out_r, $out_w);
pipe($in_r, $in_w) or die "pipe failed: $!";
pipe($out_r, $out_w) or die "pipe failed: $!";
my $git = $self->{git};
- my $git_dir = $git->{git_dir};
- if (my $lock_path = $self->{lock_path}) {
- sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
- die "failed to open lock $lock_path: $!";
- # wait for other processes to be done
- flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
- $self->{lockfh} = $lockfh;
- }
+ $self->lock_acquire;
local $/ = "\n";
chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $self->{ref}));
+ my $git_dir = $git->{git_dir};
my @cmd = ('git', "--git-dir=$git_dir", qw(fast-import
--quiet --done --date-format=raw));
my $rdr = { 0 => fileno($out_r), 1 => fileno($in_w) };
@@ -384,10 +378,7 @@ $? == 0 or die "fast-import failed: $?";
_update_git_info($self, 1) if delete $self->{nchg};
- $self->{lock_path} or return;
- my $lockfh = delete $self->{lockfh} or die "BUG: not locked: $!";
- flock($lockfh, LOCK_UN) or die "unlock failed: $!";
- close $lockfh or die "close lock failed: $!";
+ $self->lock_release;
}
sub atfork_child {
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
new file mode 100644
index 0000000000000000000000000000000000000000..ca6b33f26ed011bbc43648a1fa84e918b856330b
--- /dev/null
+++ b/lib/PublicInbox/Lock.pm
@@ -0,0 +1,31 @@
+# Copyright (C) 2018 all contributors
+# License: AGPL-3.0+
+
+# Base class for per-inbox locking
+package PublicInbox::Lock;
+use strict;
+use warnings;
+use Fcntl qw(:flock :DEFAULT);
+use Carp qw(croak);
+
+# we only acquire the flock if creating or reindexing;
+# PublicInbox::Import already has the lock on its own.
+sub lock_acquire {
+ my ($self) = @_;
+ croak 'already locked' if $self->{lockfh};
+ my $lock_path = $self->{lock_path} or return;
+ sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
+ die "failed to open lock $lock_path: $!\n";
+ flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
+ $self->{lockfh} = $lockfh;
+}
+
+sub lock_release {
+ my ($self) = @_;
+ return unless $self->{lock_path};
+ my $lockfh = delete $self->{lockfh} or croak 'not locked';
+ flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
+ close $lockfh or die "close failed: $!\n";
+}
+
+1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index ae2544da18a32e5472077caee939080ca639637b..0b9fb4bce101b92dded63034d04b0373f8f59291 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -9,9 +9,8 @@ # This writes to the search index.
package PublicInbox::SearchIdx;
use strict;
use warnings;
-use Fcntl qw(:flock :DEFAULT);
+use base qw(PublicInbox::Search PublicInbox::Lock);
use PublicInbox::MIME;
-use base qw(PublicInbox::Search);
use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/;
use PublicInbox::MsgIter;
use Carp qw(croak);
@@ -96,7 +95,7 @@ sub _xdb_release {
my ($self) = @_;
my $xdb = delete $self->{xdb} or croak 'not acquired';
$xdb->close;
- _lock_release($self) if $self->{creat};
+ $self->lock_release if $self->{creat};
undef;
}
@@ -107,31 +106,11 @@ my $dir = $self->xdir;
my $flag = Search::Xapian::DB_OPEN;
if ($self->{creat}) {
require File::Path;
- _lock_acquire($self);
+ $self->lock_acquire;
File::Path::mkpath($dir);
$flag = Search::Xapian::DB_CREATE_OR_OPEN;
}
$self->{xdb} = Search::Xapian::WritableDatabase->new($dir, $flag);
-}
-
-# we only acquire the flock if creating or reindexing;
-# PublicInbox::Import already has the lock on its own.
-sub _lock_acquire {
- my ($self) = @_;
- croak 'already locked' if $self->{lockfh};
- my $lock_path = $self->{lock_path} or return;
- sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
- die "failed to open lock $lock_path: $!\n";
- flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
- $self->{lockfh} = $lockfh;
-}
-
-sub _lock_release {
- my ($self) = @_;
- return unless $self->{lock_path};
- my $lockfh = delete $self->{lockfh} or croak 'not locked';
- flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
- close $lockfh or die "close failed: $!\n";
}
sub add_val ($$$) {
diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm
index 51a88fda36b8a9e008c35223d314a4b6efd4528e..54a59ab02a61b5985a1a859c898ae86ca1d0af2a 100644
--- a/lib/PublicInbox/SearchIdxSkeleton.pm
+++ b/lib/PublicInbox/SearchIdxSkeleton.pm
@@ -38,8 +38,7 @@
$w->autoflush(1);
# lock on only exists in parent, not in worker
- my $l = $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
- open my $fh, '>>', $l or die "failed to create $l: $!\n";
+ $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
$self;
}
@@ -111,9 +110,9 @@
# multiple processes write to the same pipe, so use flock
# We can't avoid this lock for <=PIPE_BUF writes, either,
# because those atomic writes can break up >PIPE_BUF ones
- $self->_lock_acquire;
+ $self->lock_acquire;
print $w $str or $err = $!;
- $self->_lock_release;
+ $self->lock_release;
die "print failed: $err\n" if $err;
}
@@ -121,10 +120,10 @@
sub remote_remove {
my ($self, $oid, $mid) = @_;
my $err;
- $self->_lock_acquire;
+ $self->lock_acquire;
eval { $self->SUPER::remote_remove($oid, $mid) };
$err = $@;
- $self->_lock_release;
+ $self->lock_release;
die $err if $err;
}
@@ -151,9 +150,9 @@ sub barrier_init {
my ($self, $nparts) = @_;
my $w = $self->{w};
my $err;
- $self->_lock_acquire;
+ $self->lock_acquire;
print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n";
- $self->_lock_release;
+ $self->lock_release;
die $err if $err;
}
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index bdde76bbbf54702007d5f27a8c0e05777d67d06c..36901cdea0c33e69dd538f70f7f0acd40932eb4f 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -5,7 +5,7 @@ # This interface wraps and mimics PublicInbox::Import
package PublicInbox::V2Writable;
use strict;
use warnings;
-use Fcntl qw(:flock :DEFAULT);
+use base qw(PublicInbox::Lock);
use PublicInbox::SearchIdxPart;
use PublicInbox::SearchIdxSkeleton;
use PublicInbox::MIME;
@@ -26,13 +26,10 @@
sub new {
my ($class, $v2ibx, $creat) = @_;
my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n";
- my $lock_path = "$dir/inbox.lock";
unless (-d $dir) {
if ($creat) {
require File::Path;
File::Path::mkpath($dir);
- open my $fh, '>>', $lock_path or
- die "failed to open $lock_path: $!\n";
} else {
die "$dir does not exist\n";
}
@@ -64,7 +61,7 @@ lock_path => "$dir/inbox.lock",
# limit each repo to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
};
- bless $self, $class
+ bless $self, $class;
}
# returns undef on duplicate or spam
@@ -187,6 +184,8 @@ # do not leak read-only FDs to child processes, we only have these
# FDs for duplicate detection so they should not be
# frequently activated.
delete $ibx->{$_} foreach (qw(git mm search));
+
+ $self->lock_acquire;
# first time initialization, first we create the skeleton pipe:
my $skel = $self->{skel} = PublicInbox::SearchIdxSkeleton->new($self);
@@ -253,6 +252,7 @@ my ($self) = @_;
my $im = delete $self->{im};
$im->done if $im; # PublicInbox::Import::done
$self->searchidx_checkpoint(0);
+ $self->lock_release;
}
sub checkpoint {
@@ -399,7 +399,7 @@ my ($self, $git, $packed_bytes) = @_;
my $im = PublicInbox::Import->new($git, undef, undef, $self->{-inbox});
$im->{bytes_added} = int($packed_bytes / $PACKING_FACTOR);
$im->{want_object_info} = 1;
- $im->{lock_path} = $self->{lock_path};
+ $im->{lock_path} = undef;
$im->{path_type} = 'v2';
$self->{im} = $im;
}