X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=832384601625fcbb5bca8cfa374e47d05d72d46b;hb=79c5fdeacf75a2daeb57a7a5f00a170044e28634;hp=5b4d9c0d613a76d91f1d3432cf141cc404bc2c47;hpb=678fb3c2ba03a4a284620c039717c0d94dd6106a;p=public-inbox.git diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 5b4d9c0d..83238460 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -26,22 +26,14 @@ sub nproc () { int($ENV{NPROC} || `nproc 2>/dev/null` || 2); } -sub new { - my ($class, $v2ibx, $creat) = @_; - my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n"; - unless (-d $dir) { - if ($creat) { - require File::Path; - File::Path::mkpath($dir); - } else { - die "$dir does not exist\n"; - } - } - +sub count_partitions ($) { + my ($self) = @_; my $nparts = 0; - my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION; + my $xpfx = $self->{xpfx}; # always load existing partitions in case core count changes: + # Also, partition count may change while -watch is running + # due to -compact if (-d $xpfx) { foreach my $part (<$xpfx/*>) { -d $part && $part =~ m!/\d+\z! or next; @@ -51,21 +43,37 @@ sub new { }; } } - $nparts = nproc() if ($nparts == 0); + $nparts; +} + +sub new { + my ($class, $v2ibx, $creat) = @_; + my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n"; + unless (-d $dir) { + if ($creat) { + require File::Path; + File::Path::mkpath($dir); + } else { + die "$dir does not exist\n"; + } + } $v2ibx = PublicInbox::InboxWritable->new($v2ibx); + + my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION; my $self = { -inbox => $v2ibx, im => undef, # PublicInbox::Import - partitions => $nparts, parallel => 1, transact_bytes => 0, + xpfx => $xpfx, over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"), lock_path => "$dir/inbox.lock", # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), last_commit => [], }; + $self->{partitions} = count_partitions($self) || nproc(); bless $self, $class; } @@ -206,6 +214,12 @@ sub idx_init { $self->lock_acquire; $over->create($self); + # -compact can change partition count while -watch is idle + my $nparts = count_partitions($self); + if ($nparts && $nparts != $self->{partitions}) { + $self->{partitions} = $nparts; + } + # need to create all parts before initializing msgmap FD my $max = $self->{partitions} - 1; @@ -256,6 +270,7 @@ sub remove_internal { my $mark; foreach my $mid (@$mids) { + my %gone; $srch->reopen->each_smsg_by_mid($mid, sub { my ($smsg) = @_; $smsg->load_expand; @@ -267,28 +282,34 @@ sub remove_internal { my $orig = $$msg; my $cur = PublicInbox::MIME->new($msg); if (content_id($cur) eq $cid) { - $mm->num_delete($smsg->num); - # $removed should only be set once assuming - # no bugs in our deduplication code: - $removed = $smsg; - $removed->{mime} = $cur; - my $oid = $smsg->{blob}; - if ($purge) { - $purge->{$oid} = 1; - } else { - ($mark, undef) = - $im->remove(\$orig, $cmt_msg); - } - $orig = undef; - $removed->num; # memoize this for callers - - foreach my $idx (@$parts) { - $idx->remote_remove($oid, $mid); - } - $self->{over}->remove_oid($oid, $mid); + $smsg->{mime} = $cur; + $gone{$smsg->num} = [ $smsg, \$orig ]; } 1; # continue }); + my $n = scalar keys %gone; + next unless $n; + if ($n > 1) { + warn "BUG: multiple articles linked to <$mid>\n", + join(',', sort keys %gone), "\n"; + } + foreach my $num (keys %gone) { + my ($smsg, $orig) = @{$gone{$num}}; + $mm->num_delete($num); + # $removed should only be set once assuming + # no bugs in our deduplication code: + $removed = $smsg; + my $oid = $smsg->{blob}; + if ($purge) { + $purge->{$oid} = 1; + } else { + ($mark, undef) = $im->remove($orig, $cmt_msg); + } + $orig = undef; + foreach my $idx (@$parts) { + $idx->remote_remove($oid, $mid); + } + } $self->barrier; }