From 41bdfa63fa4c1a288e85720614436580fd0a9741 Mon Sep 17 00:00:00 2001 From: "Eric Wong (Contractor, The Linux Foundation)" Date: Mon, 19 Mar 2018 08:14:51 +0000 Subject: [PATCH] Lock: new base class for writable lockers This reduces code duplication needed for locking and and hopefully makes things easier to understand. --- MANIFEST | 1 + lib/PublicInbox/Import.pm | 17 ++++----------- lib/PublicInbox/Lock.pm | 31 ++++++++++++++++++++++++++++ lib/PublicInbox/SearchIdx.pm | 27 +++--------------------- lib/PublicInbox/SearchIdxSkeleton.pm | 15 +++++++------- lib/PublicInbox/V2Writable.pm | 12 +++++------ 6 files changed, 52 insertions(+), 51 deletions(-) create mode 100644 lib/PublicInbox/Lock.pm diff --git a/MANIFEST b/MANIFEST index a42b9e1a..3b0b013f 100644 --- a/MANIFEST +++ b/MANIFEST @@ -68,6 +68,7 @@ lib/PublicInbox/Import.pm lib/PublicInbox/Inbox.pm lib/PublicInbox/Linkify.pm lib/PublicInbox/Listener.pm +lib/PublicInbox/Lock.pm lib/PublicInbox/MDA.pm lib/PublicInbox/MID.pm lib/PublicInbox/MIME.pm diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index ca30ac44..fc740fa4 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -7,7 +7,7 @@ package PublicInbox::Import; use strict; use warnings; -use Fcntl qw(:flock :DEFAULT); +use base qw(PublicInbox::Lock); use PublicInbox::Spawn qw(spawn); use PublicInbox::MID qw(mids mid_mime mid2path); use PublicInbox::Address; @@ -44,19 +44,13 @@ sub gfi_start { pipe($in_r, $in_w) or die "pipe failed: $!"; pipe($out_r, $out_w) or die "pipe failed: $!"; my $git = $self->{git}; - my $git_dir = $git->{git_dir}; - if (my $lock_path = $self->{lock_path}) { - sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or - die "failed to open lock $lock_path: $!"; - # wait for other processes to be done - flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; - $self->{lockfh} = $lockfh; - } + $self->lock_acquire; local $/ = "\n"; chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $self->{ref})); + my $git_dir = $git->{git_dir}; my @cmd = ('git', "--git-dir=$git_dir", qw(fast-import --quiet --done --date-format=raw)); my $rdr = { 0 => fileno($out_r), 1 => fileno($in_w) }; @@ -384,10 +378,7 @@ sub done { _update_git_info($self, 1) if delete $self->{nchg}; - $self->{lock_path} or return; - my $lockfh = delete $self->{lockfh} or die "BUG: not locked: $!"; - flock($lockfh, LOCK_UN) or die "unlock failed: $!"; - close $lockfh or die "close lock failed: $!"; + $self->lock_release; } sub atfork_child { diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm new file mode 100644 index 00000000..ca6b33f2 --- /dev/null +++ b/lib/PublicInbox/Lock.pm @@ -0,0 +1,31 @@ +# Copyright (C) 2018 all contributors +# License: AGPL-3.0+ + +# Base class for per-inbox locking +package PublicInbox::Lock; +use strict; +use warnings; +use Fcntl qw(:flock :DEFAULT); +use Carp qw(croak); + +# we only acquire the flock if creating or reindexing; +# PublicInbox::Import already has the lock on its own. +sub lock_acquire { + my ($self) = @_; + croak 'already locked' if $self->{lockfh}; + my $lock_path = $self->{lock_path} or return; + sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or + die "failed to open lock $lock_path: $!\n"; + flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; + $self->{lockfh} = $lockfh; +} + +sub lock_release { + my ($self) = @_; + return unless $self->{lock_path}; + my $lockfh = delete $self->{lockfh} or croak 'not locked'; + flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; + close $lockfh or die "close failed: $!\n"; +} + +1; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index ae2544da..0b9fb4bc 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -9,9 +9,8 @@ package PublicInbox::SearchIdx; use strict; use warnings; -use Fcntl qw(:flock :DEFAULT); +use base qw(PublicInbox::Search PublicInbox::Lock); use PublicInbox::MIME; -use base qw(PublicInbox::Search); use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/; use PublicInbox::MsgIter; use Carp qw(croak); @@ -96,7 +95,7 @@ sub _xdb_release { my ($self) = @_; my $xdb = delete $self->{xdb} or croak 'not acquired'; $xdb->close; - _lock_release($self) if $self->{creat}; + $self->lock_release if $self->{creat}; undef; } @@ -107,33 +106,13 @@ sub _xdb_acquire { my $flag = Search::Xapian::DB_OPEN; if ($self->{creat}) { require File::Path; - _lock_acquire($self); + $self->lock_acquire; File::Path::mkpath($dir); $flag = Search::Xapian::DB_CREATE_OR_OPEN; } $self->{xdb} = Search::Xapian::WritableDatabase->new($dir, $flag); } -# we only acquire the flock if creating or reindexing; -# PublicInbox::Import already has the lock on its own. -sub _lock_acquire { - my ($self) = @_; - croak 'already locked' if $self->{lockfh}; - my $lock_path = $self->{lock_path} or return; - sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or - die "failed to open lock $lock_path: $!\n"; - flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; - $self->{lockfh} = $lockfh; -} - -sub _lock_release { - my ($self) = @_; - return unless $self->{lock_path}; - my $lockfh = delete $self->{lockfh} or croak 'not locked'; - flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; - close $lockfh or die "close failed: $!\n"; -} - sub add_val ($$$) { my ($doc, $col, $num) = @_; $num = Search::Xapian::sortable_serialise($num); diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm index 51a88fda..54a59ab0 100644 --- a/lib/PublicInbox/SearchIdxSkeleton.pm +++ b/lib/PublicInbox/SearchIdxSkeleton.pm @@ -38,8 +38,7 @@ sub new { $w->autoflush(1); # lock on only exists in parent, not in worker - my $l = $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock'; - open my $fh, '>>', $l or die "failed to create $l: $!\n"; + $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock'; $self; } @@ -111,9 +110,9 @@ sub index_skeleton { # multiple processes write to the same pipe, so use flock # We can't avoid this lock for <=PIPE_BUF writes, either, # because those atomic writes can break up >PIPE_BUF ones - $self->_lock_acquire; + $self->lock_acquire; print $w $str or $err = $!; - $self->_lock_release; + $self->lock_release; die "print failed: $err\n" if $err; } @@ -121,10 +120,10 @@ sub index_skeleton { sub remote_remove { my ($self, $oid, $mid) = @_; my $err; - $self->_lock_acquire; + $self->lock_acquire; eval { $self->SUPER::remote_remove($oid, $mid) }; $err = $@; - $self->_lock_release; + $self->lock_release; die $err if $err; } @@ -151,9 +150,9 @@ sub barrier_init { my ($self, $nparts) = @_; my $w = $self->{w}; my $err; - $self->_lock_acquire; + $self->lock_acquire; print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n"; - $self->_lock_release; + $self->lock_release; die $err if $err; } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index bdde76bb..36901cde 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -5,7 +5,7 @@ package PublicInbox::V2Writable; use strict; use warnings; -use Fcntl qw(:flock :DEFAULT); +use base qw(PublicInbox::Lock); use PublicInbox::SearchIdxPart; use PublicInbox::SearchIdxSkeleton; use PublicInbox::MIME; @@ -26,13 +26,10 @@ sub nproc () { sub new { my ($class, $v2ibx, $creat) = @_; my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n"; - my $lock_path = "$dir/inbox.lock"; unless (-d $dir) { if ($creat) { require File::Path; File::Path::mkpath($dir); - open my $fh, '>>', $lock_path or - die "failed to open $lock_path: $!\n"; } else { die "$dir does not exist\n"; } @@ -64,7 +61,7 @@ sub new { # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), }; - bless $self, $class + bless $self, $class; } # returns undef on duplicate or spam @@ -188,6 +185,8 @@ sub idx_init { # frequently activated. delete $ibx->{$_} foreach (qw(git mm search)); + $self->lock_acquire; + # first time initialization, first we create the skeleton pipe: my $skel = $self->{skel} = PublicInbox::SearchIdxSkeleton->new($self); @@ -253,6 +252,7 @@ sub done { my $im = delete $self->{im}; $im->done if $im; # PublicInbox::Import::done $self->searchidx_checkpoint(0); + $self->lock_release; } sub checkpoint { @@ -399,7 +399,7 @@ sub import_init { my $im = PublicInbox::Import->new($git, undef, undef, $self->{-inbox}); $im->{bytes_added} = int($packed_bytes / $PACKING_FACTOR); $im->{want_object_info} = 1; - $im->{lock_path} = $self->{lock_path}; + $im->{lock_path} = undef; $im->{path_type} = 'v2'; $self->{im} = $im; } -- 2.44.0