From 24294059123d4edd9e31f211d72c641a5d1a0d72 Mon Sep 17 00:00:00 2001 From: "Eric Wong (Contractor, The Linux Foundation)" Date: Mon, 19 Mar 2018 08:14:56 +0000 Subject: [PATCH] v2writable: allow disabling parallelization While parallel processes improves import speed for initial imports; they are probably not necessary for daily mail imports via WatchMaildir and certainly not for public-inbox-init. Save some memory for daily use and even helps improve readability of some subroutines by showing which methods they call remotely. --- lib/PublicInbox/SearchIdx.pm | 56 ++++++++++++++++++++----- lib/PublicInbox/SearchIdxPart.pm | 62 ++++++++++++++------------- lib/PublicInbox/SearchIdxSkeleton.pm | 63 +++++++++++++--------------- lib/PublicInbox/V2Writable.pm | 3 +- lib/PublicInbox/WatchMaildir.pm | 4 +- script/public-inbox-init | 3 +- t/v2writable.t | 6 +-- 7 files changed, 118 insertions(+), 79 deletions(-) diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 0b9fb4bc..3d80b002 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -523,6 +523,7 @@ sub link_and_save { $doc->add_boolean_term('Q' . $_) foreach @$mids; my $vivified = 0; + $self->{skel} and die "Should not have read-only skel here\n";; foreach my $mid (@$mids) { $self->each_smsg_by_mid($mid, sub { my ($cur) = @_; @@ -887,24 +888,59 @@ sub DESTROY { # remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton sub remote_commit { my ($self) = @_; - print { $self->{w} } "commit\n" or die "failed to write commit: $!"; + if (my $w = $self->{w}) { + print $w "commit\n" or die "failed to write commit: $!"; + } else { + $self->commit_txn_lazy; + if (my $skel = $self->{skeleton}) { + $skel->commit_txn_lazy; + } + } } sub remote_close { my ($self) = @_; - my $pid = delete $self->{pid} or die "no process to wait on\n"; - my $w = delete $self->{w} or die "no pipe to write to\n"; - print $w "close\n" or die "failed to write to pid:$pid: $!\n"; - close $w or die "failed to close pipe for pid:$pid: $!\n"; - waitpid($pid, 0) == $pid or die "remote process did not finish"; - $? == 0 or die ref($self)." pid:$pid exited with: $?"; + if (my $w = delete $self->{w}) { + my $pid = delete $self->{pid} or die "no process to wait on\n"; + print $w "close\n" or die "failed to write to pid:$pid: $!\n"; + close $w or die "failed to close pipe for pid:$pid: $!\n"; + waitpid($pid, 0) == $pid or die "remote process did not finish"; + $? == 0 or die ref($self)." pid:$pid exited with: $?"; + } else { + die "transaction in progress $self\n" if $self->{txn}; + $self->_xdb_release if $self->{xdb}; + } } -# triggers remove_by_oid in partition or skeleton sub remote_remove { my ($self, $oid, $mid) = @_; - print { $self->{w} } "D $oid $mid\n" or - die "failed to write remove $!"; + if (my $w = $self->{w}) { + # triggers remove_by_oid in partition or skeleton + print $w "D $oid $mid\n" or die "failed to write remove $!"; + } else { + $self->begin_txn_lazy; + $self->remove_by_oid($oid, $mid); + } +} + +sub begin_txn_lazy { + my ($self) = @_; + return if $self->{txn}; + my $xdb = $self->{xdb} || $self->_xdb_acquire; + $xdb->begin_transaction; + $self->{txn} = 1; +} + +sub commit_txn_lazy { + my ($self) = @_; + delete $self->{txn} or return; + $self->{xdb}->commit_transaction; +} + +sub worker_done { + my ($self) = @_; + die "$$ $0 xdb not released\n" if $self->{xdb}; + die "$$ $0 still in transaction\n" if $self->{txn}; } 1; diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm index d8c8c8bb..82f5c1bc 100644 --- a/lib/PublicInbox/SearchIdxPart.pm +++ b/lib/PublicInbox/SearchIdxPart.pm @@ -9,6 +9,15 @@ sub new { my ($class, $v2writable, $part, $skel) = @_; my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part); $self->{skeleton} = $skel; + # create the DB: + $self->_xdb_acquire; + $self->_xdb_release; + $self->spawn_worker($v2writable, $part) if $v2writable->{parallel}; + $self; +} + +sub spawn_worker { + my ($self, $v2writable, $part) = @_; my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; binmode $r, ':raw'; @@ -32,44 +41,30 @@ sub new { $self->{pid} = $pid; $self->{w} = $w; close $r; - $self; } sub partition_worker_loop ($$$) { my ($self, $r, $part) = @_; $0 = "pi-v2-partition[$part]"; - my $xdb = $self->_xdb_acquire; - $xdb->begin_transaction; - my $txn = 1; + $self->begin_txn_lazy; while (my $line = $r->getline) { if ($line eq "commit\n") { - $xdb->commit_transaction if $txn; - $txn = undef; + $self->commit_txn_lazy; $self->{skeleton}->remote_commit; } elsif ($line eq "close\n") { $self->_xdb_release; - $xdb = $txn = undef; } elsif ($line eq "barrier\n") { - $xdb->commit_transaction if $txn; - $txn = undef; + $self->commit_txn_lazy; print { $self->{skeleton}->{w} } "barrier $part\n" or die "write failed to skeleton: $!\n"; } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) { my ($oid, $mid) = ($1, $2); - $xdb ||= $self->_xdb_acquire; - if (!$txn) { - $xdb->begin_transaction; - $txn = 1; - } + $self->begin_txn_lazy; $self->remove_by_oid($oid, $mid); } else { chomp $line; my ($len, $artnum, $oid, $mid0) = split(/ /, $line); - $xdb ||= $self->_xdb_acquire; - if (!$txn) { - $xdb->begin_transaction; - $txn = 1; - } + $self->begin_txn_lazy; my $n = read($r, my $msg, $len) or die "read: $!\n"; $n == $len or die "short read: $n != $len\n"; my $mime = PublicInbox::MIME->new(\$msg); @@ -77,17 +72,21 @@ sub partition_worker_loop ($$$) { $self->add_message($mime, $n, $artnum, $oid, $mid0); } } - warn "$$ still in transaction\n" if $txn; - warn "$$ xdb active\n" if $xdb; + $self->worker_done; } # called by V2Writable sub index_raw { - my ($self, $len, $msgref, $artnum, $object_id, $mid0) = @_; - my $w = $self->{w}; - print $w "$len $artnum $object_id $mid0\n", $$msgref or die - "failed to write partition $!\n"; - $w->flush or die "failed to flush: $!\n"; + my ($self, $bytes, $msgref, $artnum, $oid, $mid0, $mime) = @_; + if (my $w = $self->{w}) { + print $w "$bytes $artnum $oid $mid0\n", $$msgref or die + "failed to write partition $!\n"; + $w->flush or die "failed to flush: $!\n"; + } else { + $$msgref = undef; + $self->begin_txn_lazy; + $self->add_message($mime, $bytes, $artnum, $oid, $mid0); + } } sub atfork_child { @@ -96,9 +95,14 @@ sub atfork_child { # called by V2Writable: sub remote_barrier { - my $w = $_[0]->{w}; - print $w "barrier\n" or die "failed to print: $!"; - $w->flush or die "failed to flush: $!"; + my ($self) = @_; + if (my $w = $self->{w}) { + print $w "barrier\n" or die "failed to print: $!"; + $w->flush or die "failed to flush: $!"; + } else { + $self->commit_txn_lazy; + $self->{skeleton}->remote_commit; + } } 1; diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm index 54a59ab0..ba439696 100644 --- a/lib/PublicInbox/SearchIdxSkeleton.pm +++ b/lib/PublicInbox/SearchIdxSkeleton.pm @@ -12,7 +12,12 @@ sub new { # create the DB: $self->_xdb_acquire; $self->_xdb_release; + $self->spawn_worker($v2writable) if $v2writable->{parallel}; + $self +} +sub spawn_worker { + my ($self, $v2writable) = @_; my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; my ($barrier_wait, $barrier_note); @@ -39,24 +44,19 @@ sub new { # lock on only exists in parent, not in worker $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock'; - $self; } sub skeleton_worker_loop { my ($self, $r, $barrier_note) = @_; $barrier_note->autoflush(1); $0 = 'pi-v2-skeleton'; - my $xdb = $self->_xdb_acquire; - $xdb->begin_transaction; - my $txn = 1; + $self->begin_txn_lazy; my $barrier = undef; while (my $line = $r->getline) { if ($line eq "commit\n") { - $xdb->commit_transaction if $txn; - $txn = undef; + $self->commit_txn_lazy; } elsif ($line eq "close\n") { $self->_xdb_release; - $xdb = $txn = undef; } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) { my $n = $1 - 1; die "barrier in-progress\n" if defined $barrier; @@ -67,18 +67,13 @@ sub skeleton_worker_loop { delete $barrier->{$1} or die "unknown barrier: $part\n"; if ((scalar keys %$barrier) == 0) { $barrier = undef; - $xdb->commit_transaction if $txn; - $txn = undef; + $self->commit_txn_lazy; print $barrier_note "barrier_done\n" or die "print failed to barrier note: $!"; } } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) { my ($oid, $mid) = ($1, $2); - $xdb ||= $self->_xdb_acquire; - if (!$txn) { - $xdb->begin_transaction; - $txn = 1; - } + $self->begin_txn_lazy; $self->remove_by_oid($oid, $mid); } else { my $len = int($line); @@ -86,35 +81,34 @@ sub skeleton_worker_loop { $n == $len or die "short read: $n != $len\n"; $msg = thaw($msg); # should raise on error defined $msg or die "failed to thaw buffer\n"; - $xdb ||= $self->_xdb_acquire; - if (!$txn) { - $xdb->begin_transaction; - $txn = 1; - } + $self->begin_txn_lazy; eval { index_skeleton_real($self, $msg) }; warn "failed to index message <$msg->[-1]>: $@\n" if $@; } } - die "xdb not released\n" if $xdb; - die "in transaction\n" if $txn; + $self->worker_done; } # called by a partition worker sub index_skeleton { my ($self, $values) = @_; - my $w = $self->{w}; - my $err; - my $str = freeze($values); - $str = length($str) . "\n" . $str; + if (my $w = $self->{w}) { + my $err; + my $str = freeze($values); + $str = length($str) . "\n" . $str; - # multiple processes write to the same pipe, so use flock - # We can't avoid this lock for <=PIPE_BUF writes, either, - # because those atomic writes can break up >PIPE_BUF ones - $self->lock_acquire; - print $w $str or $err = $!; - $self->lock_release; + # multiple processes write to the same pipe, so use flock + # We can't avoid this lock for <=PIPE_BUF writes, either, + # because those atomic writes can break up >PIPE_BUF ones + $self->lock_acquire; + print $w $str or $err = $!; + $self->lock_release; - die "print failed: $err\n" if $err; + die "print failed: $err\n" if $err; + } else { + $self->begin_txn_lazy; + index_skeleton_real($self, $values); + } } sub remote_remove { @@ -148,7 +142,7 @@ sub index_skeleton_real ($$) { # write to the subprocess sub barrier_init { my ($self, $nparts) = @_; - my $w = $self->{w}; + my $w = $self->{w} or return; my $err; $self->lock_acquire; print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n"; @@ -158,7 +152,8 @@ sub barrier_init { sub barrier_wait { my ($self) = @_; - my $l = $self->{barrier_wait}->getline; + my $bw = $self->{barrier_wait} or return; + my $l = $bw->getline; $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n"; } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 261f9d91..8e1363ea 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -56,6 +56,7 @@ sub new { xap_rw => undef, # PublicInbox::V2SearchIdx xap_ro => undef, partitions => $nparts, + parallel => 1, transact_bytes => 0, lock_path => "$dir/inbox.lock", # limit each repo to 1GB or so @@ -93,7 +94,7 @@ sub add { my $nparts = $self->{partitions}; my $part = $num % $nparts; my $idx = $self->idx_part($part); - $idx->index_raw($len, $msgref, $num, $oid, $mid0); + $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime); my $n = $self->{transact_bytes} += $len; if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) { $self->checkpoint; diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 2808b726..e28e602a 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -259,7 +259,9 @@ sub _importer_for { if ($v == 2) { eval { require PublicInbox::V2Writable }; die "v2 not supported: $@\n" if $@; - PublicInbox::V2Writable->new($inbox); + my $v2w = PublicInbox::V2Writable->new($inbox); + $v2w->{parallel} = 0; + $v2w; } elsif ($v == 1) { my $git = $inbox->git; my $name = $inbox->{name}; diff --git a/script/public-inbox-init b/script/public-inbox-init index f7a60fbb..fdad1366 100755 --- a/script/public-inbox-init +++ b/script/public-inbox-init @@ -83,8 +83,9 @@ if ($version >= 2) { }; $ibx = PublicInbox::Inbox->new($ibx); my $v2w = PublicInbox::V2Writable->new($ibx, 1); + $v2w->{parallel} = 0; + $v2w->idx_init; $v2w->git_init(0); - $v2w->idx_init(0); $v2w->done; } elsif ($version == 1) { x(qw(git init -q --bare), $mainrepo); diff --git a/t/v2writable.t b/t/v2writable.t index 771e8c17..2088f3fe 100644 --- a/t/v2writable.t +++ b/t/v2writable.t @@ -108,10 +108,10 @@ if ('ensure git configs are correct') { ok($im->add($mime), 'message with multiple Message-ID'); $im->done; my @found; - $ibx->search->reopen; - $ibx->search->each_smsg_by_mid('abcde@1', sub { push @found, @_; 1 }); + my $srch = $ibx->search; + $srch->reopen->each_smsg_by_mid('abcde@1', sub { push @found, @_; 1 }); is(scalar(@found), 1, 'message found by first MID'); - $ibx->search->each_smsg_by_mid('abcde@2', sub { push @found, @_; 1 }); + $srch->reopen->each_smsg_by_mid('abcde@2', sub { push @found, @_; 1 }); is(scalar(@found), 2, 'message found by second MID'); is($found[0]->{doc_id}, $found[1]->{doc_id}, 'same document'); ok($found[1]->{doc_id} > 0, 'doc_id is positive'); -- 2.44.0