]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/SearchIdxSkeleton.pm
Lock: new base class for writable lockers
[public-inbox.git] / lib / PublicInbox / SearchIdxSkeleton.pm
index 4cb10f59136cf0fe3f1365f642060a6f2a339f30..54a59ab02a61b5985a1a859c898ae86ca1d0af2a 100644 (file)
@@ -38,8 +38,7 @@ sub new {
        $w->autoflush(1);
 
        # lock on only exists in parent, not in worker
-       my $l = $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
-       open my $fh, '>>', $l or die "failed to create $l: $!\n";
+       $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
        $self;
 }
 
@@ -73,6 +72,14 @@ sub skeleton_worker_loop {
                                print $barrier_note "barrier_done\n" or die
                                        "print failed to barrier note: $!";
                        }
+               } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) {
+                       my ($oid, $mid) = ($1, $2);
+                       $xdb ||= $self->_xdb_acquire;
+                       if (!$txn) {
+                               $xdb->begin_transaction;
+                               $txn = 1;
+                       }
+                       $self->remove_by_oid($oid, $mid);
                } else {
                        my $len = int($line);
                        my $n = read($r, my $msg, $len) or die "read: $!\n";
@@ -103,13 +110,23 @@ sub index_skeleton {
        # multiple processes write to the same pipe, so use flock
        # We can't avoid this lock for <=PIPE_BUF writes, either,
        # because those atomic writes can break up >PIPE_BUF ones
-       $self->_lock_acquire;
+       $self->lock_acquire;
        print $w $str or $err = $!;
-       $self->_lock_release;
+       $self->lock_release;
 
        die "print failed: $err\n" if $err;
 }
 
+sub remote_remove {
+       my ($self, $oid, $mid) = @_;
+       my $err;
+       $self->lock_acquire;
+       eval { $self->SUPER::remote_remove($oid, $mid) };
+       $err = $@;
+       $self->lock_release;
+       die $err if $err;
+}
+
 # values: [ TS, NUM, BYTES, LINES, MID, XPATH, doc_data ]
 sub index_skeleton_real ($$) {
        my ($self, $values) = @_;
@@ -131,9 +148,12 @@ sub index_skeleton_real ($$) {
 # write to the subprocess
 sub barrier_init {
        my ($self, $nparts) = @_;
-       my $w = $_[0]->{w};
-       print $w "barrier_init $nparts\n" or die "failed to write: $!";
-       $w->flush or die "failed to flush: $!";
+       my $w = $self->{w};
+       my $err;
+       $self->lock_acquire;
+       print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n";
+       $self->lock_release;
+       die $err if $err;
 }
 
 sub barrier_wait {