X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=a8117aaa15a109cd90e2513ca42703120ca30ab3;hb=9d6468d9560b8cd26ba2529d7a09a93ec5ad8c00;hp=5b4d9c0d613a76d91f1d3432cf141cc404bc2c47;hpb=678fb3c2ba03a4a284620c039717c0d94dd6106a;p=public-inbox.git diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 5b4d9c0d..a8117aaa 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -26,22 +26,14 @@ sub nproc () { int($ENV{NPROC} || `nproc 2>/dev/null` || 2); } -sub new { - my ($class, $v2ibx, $creat) = @_; - my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n"; - unless (-d $dir) { - if ($creat) { - require File::Path; - File::Path::mkpath($dir); - } else { - die "$dir does not exist\n"; - } - } - +sub count_partitions ($) { + my ($self) = @_; my $nparts = 0; - my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION; + my $xpfx = $self->{xpfx}; # always load existing partitions in case core count changes: + # Also, partition count may change while -watch is running + # due to -compact if (-d $xpfx) { foreach my $part (<$xpfx/*>) { -d $part && $part =~ m!/\d+\z! or next; @@ -51,21 +43,37 @@ sub new { }; } } - $nparts = nproc() if ($nparts == 0); + $nparts; +} + +sub new { + my ($class, $v2ibx, $creat) = @_; + my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n"; + unless (-d $dir) { + if ($creat) { + require File::Path; + File::Path::mkpath($dir); + } else { + die "$dir does not exist\n"; + } + } $v2ibx = PublicInbox::InboxWritable->new($v2ibx); + + my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION; my $self = { -inbox => $v2ibx, im => undef, # PublicInbox::Import - partitions => $nparts, parallel => 1, transact_bytes => 0, + xpfx => $xpfx, over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"), lock_path => "$dir/inbox.lock", - # limit each repo to 1GB or so + # limit each git repo (epoch) to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), - last_commit => [], + last_commit => [], # git repo -> commit }; + $self->{partitions} = count_partitions($self) || nproc(); bless $self, $class; } @@ -73,9 +81,9 @@ sub init_inbox { my ($self, $parallel) = @_; $self->{parallel} = $parallel; $self->idx_init; - my $max_git = -1; - git_dir_latest($self, \$max_git); - $self->git_init($max_git >= 0 ? $max_git : 0); + my $epoch_max = -1; + git_dir_latest($self, \$epoch_max); + $self->git_init($epoch_max >= 0 ? $epoch_max : 0); $self->done; } @@ -107,7 +115,7 @@ sub add { my $nparts = $self->{partitions}; my $part = $num % $nparts; - $self->{last_commit}->[$self->{max_git}] = $cmt; + $self->{last_commit}->[$self->{epoch_max}] = $cmt; my $idx = $self->idx_part($part); $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime); my $n = $self->{transact_bytes} += $len; @@ -206,6 +214,12 @@ sub idx_init { $self->lock_acquire; $over->create($self); + # -compact can change partition count while -watch is idle + my $nparts = count_partitions($self); + if ($nparts && $nparts != $self->{partitions}) { + $self->{partitions} = $nparts; + } + # need to create all parts before initializing msgmap FD my $max = $self->{partitions} - 1; @@ -228,7 +242,7 @@ sub purge_oids { $self->done; my $pfx = "$self->{-inbox}->{mainrepo}/git"; my $purges = []; - foreach my $i (0..$self->{max_git}) { + foreach my $i (0..$self->{epoch_max}) { my $git = PublicInbox::Git->new("$pfx/$i.git"); my $im = $self->import_init($git, 0, 1); $purges->[$i] = $im->purge_oids($purge); @@ -256,6 +270,7 @@ sub remove_internal { my $mark; foreach my $mid (@$mids) { + my %gone; $srch->reopen->each_smsg_by_mid($mid, sub { my ($smsg) = @_; $smsg->load_expand; @@ -267,34 +282,40 @@ sub remove_internal { my $orig = $$msg; my $cur = PublicInbox::MIME->new($msg); if (content_id($cur) eq $cid) { - $mm->num_delete($smsg->num); - # $removed should only be set once assuming - # no bugs in our deduplication code: - $removed = $smsg; - $removed->{mime} = $cur; - my $oid = $smsg->{blob}; - if ($purge) { - $purge->{$oid} = 1; - } else { - ($mark, undef) = - $im->remove(\$orig, $cmt_msg); - } - $orig = undef; - $removed->num; # memoize this for callers - - foreach my $idx (@$parts) { - $idx->remote_remove($oid, $mid); - } - $self->{over}->remove_oid($oid, $mid); + $smsg->{mime} = $cur; + $gone{$smsg->num} = [ $smsg, \$orig ]; } 1; # continue }); + my $n = scalar keys %gone; + next unless $n; + if ($n > 1) { + warn "BUG: multiple articles linked to <$mid>\n", + join(',', sort keys %gone), "\n"; + } + foreach my $num (keys %gone) { + my ($smsg, $orig) = @{$gone{$num}}; + $mm->num_delete($num); + # $removed should only be set once assuming + # no bugs in our deduplication code: + $removed = $smsg; + my $oid = $smsg->{blob}; + if ($purge) { + $purge->{$oid} = 1; + } else { + ($mark, undef) = $im->remove($orig, $cmt_msg); + } + $orig = undef; + foreach my $idx (@$parts) { + $idx->remote_remove($oid, $mid); + } + } $self->barrier; } if (defined $mark) { my $cmt = $im->get_mark($mark); - $self->{last_commit}->[$self->{max_git}] = $cmt; + $self->{last_commit}->[$self->{epoch_max}] = $cmt; } if ($purge && scalar keys %$purge) { return purge_oids($self, $purge); @@ -318,15 +339,20 @@ sub purge { $purges; } +sub last_commit_part ($$;$) { + my ($self, $i, $cmt) = @_; + my $v = PublicInbox::Search::SCHEMA_VERSION(); + $self->{mm}->last_commit_xap($v, $i, $cmt); +} + sub set_last_commits ($) { my ($self) = @_; - defined(my $max_git = $self->{max_git}) or return; - my $mm = $self->{mm}; + defined(my $epoch_max = $self->{epoch_max}) or return; my $last_commit = $self->{last_commit}; - foreach my $i (0..$max_git) { + foreach my $i (0..$epoch_max) { defined(my $cmt = $last_commit->[$i]) or next; $last_commit->[$i] = undef; - $mm->last_commit_n($i, $cmt); + last_commit_part($self, $i, $cmt); } } @@ -371,7 +397,7 @@ sub checkpoint { # issue a write barrier to ensure all data is visible to other processes # and read-only ops. Order of data importance is: git > SQLite > Xapian sub barrier { - my ($self, $fsync) = @_; + my ($self) = @_; if (my $im = $self->{im}) { $im->barrier; @@ -390,7 +416,6 @@ sub barrier { $_->remote_barrier foreach @$parts; $over->barrier_wait; # wait for each Xapian partition - $over->commit_fsync if $fsync; # last_commit is special, don't commit these until # remote partitions are done: @@ -404,9 +429,9 @@ sub barrier { } sub git_init { - my ($self, $new) = @_; + my ($self, $epoch) = @_; my $pfx = "$self->{-inbox}->{mainrepo}/git"; - my $git_dir = "$pfx/$new.git"; + my $git_dir = "$pfx/$epoch.git"; my @cmd = (qw(git init --bare -q), $git_dir); PublicInbox::Import::run_die(\@cmd); @@ -424,7 +449,7 @@ sub git_init { PublicInbox::Import::run_die(\@cmd); my $alt = "$all/objects/info/alternates"; - my $new_obj_dir = "../../git/$new.git/objects"; + my $new_obj_dir = "../../git/$epoch.git/objects"; my %alts; if (-e $alt) { open(my $fh, '<', $alt) or die "open < $alt: $!\n"; @@ -465,26 +490,26 @@ sub importer { $im->done; $self->barrier(1); $im = undef; - my $git_dir = $self->git_init(++$self->{max_git}); + my $git_dir = $self->git_init(++$self->{epoch_max}); my $git = PublicInbox::Git->new($git_dir); return $self->import_init($git, 0); } } - my $new = 0; + my $epoch = 0; my $max; my $latest = git_dir_latest($self, \$max); if (defined $latest) { my $git = PublicInbox::Git->new($latest); my $packed_bytes = $git->packed_bytes; if ($packed_bytes >= $self->{rotate_bytes}) { - $new = $max + 1; + $epoch = $max + 1; } else { - $self->{max_git} = $max; + $self->{epoch_max} = $max; return $self->import_init($git, $packed_bytes); } } - $self->{max_git} = $new; - $latest = $self->git_init($new); + $self->{epoch_max} = $epoch; + $latest = $self->git_init($epoch); $self->import_init(PublicInbox::Git->new($latest), 0); } @@ -652,22 +677,22 @@ sub reindex_oid { # only update last_commit for $i on reindex iff newer than current sub update_last_commit { my ($self, $git, $i, $cmt) = @_; - my $last = $self->{mm}->last_commit_n($i); + my $last = last_commit_part($self, $i); if (defined $last && is_ancestor($git, $last, $cmt)) { my @cmd = (qw(rev-list --count), "$last..$cmt"); chomp(my $n = $git->qx(@cmd)); return if $n ne '' && $n == 0; } - $self->{mm}->last_commit_n($i, $cmt); + last_commit_part($self, $i, $cmt); } sub git_dir_n ($$) { "$_[0]->{-inbox}->{mainrepo}/git/$_[1].git" } sub last_commits { - my ($self, $max_git) = @_; + my ($self, $epoch_max) = @_; my $heads = []; - for (my $i = $max_git; $i >= 0; $i--) { - $heads->[$i] = $self->{mm}->last_commit_n($i); + for (my $i = $epoch_max; $i >= 0; $i--) { + $heads->[$i] = last_commit_part($self, $i); } $heads; } @@ -684,10 +709,10 @@ sub is_ancestor ($$$) { } sub index_prepare { - my ($self, $opts, $max_git, $ranges) = @_; + my ($self, $opts, $epoch_max, $ranges) = @_; my $regen_max = 0; my $head = $self->{-inbox}->{ref_head} || 'refs/heads/master'; - for (my $i = $max_git; $i >= 0; $i--) { + for (my $i = $epoch_max; $i >= 0; $i--) { die "already indexing!\n" if $self->{index_pipe}; my $git_dir = git_dir_n($self, $i); -d $git_dir or next; # missing parts are fine @@ -796,15 +821,15 @@ sub index_sync { my ($self, $opts) = @_; $opts ||= {}; my $ibx = $self->{-inbox}; - my $max_git; - my $latest = git_dir_latest($self, \$max_git); + my $epoch_max; + my $latest = git_dir_latest($self, \$epoch_max); return unless defined $latest; $self->idx_init; # acquire lock my $mm_tmp = $self->{mm}->tmp_clone; - my $ranges = $opts->{reindex} ? [] : $self->last_commits($max_git); + my $ranges = $opts->{reindex} ? [] : $self->last_commits($epoch_max); my ($min, $max) = $mm_tmp->minmax; - my $regen = $self->index_prepare($opts, $max_git, $ranges); + my $regen = $self->index_prepare($opts, $epoch_max, $ranges); $$regen += $max if $max; my $D = {}; my @cmd = qw(log --raw -r --pretty=tformat:%h @@ -812,7 +837,7 @@ sub index_sync { # work backwards through history my $last_commit = []; - for (my $i = $max_git; $i >= 0; $i--) { + for (my $i = $epoch_max; $i >= 0; $i--) { my $git_dir = git_dir_n($self, $i); die "already reindexing!\n" if delete $self->{reindex_pipe}; -d $git_dir or next; # missing parts are fine