]> Sergey Matveev's repositories - public-inbox.git/commitdiff
Merge remote-tracking branch 'origin/reshard' into next
authorEric Wong <e@80x24.org>
Fri, 14 Jun 2019 16:25:39 +0000 (16:25 +0000)
committerEric Wong <e@80x24.org>
Fri, 14 Jun 2019 16:25:39 +0000 (16:25 +0000)
* origin/reshard:
  xcpdb: support resharding v2 repos
  xcpdb: use destination shard as progress prefix
  xapcmd: preserve indexlevel based on the destination
  v2writable: use a smaller default for Xapian partitions

1  2 
MANIFEST
lib/PublicInbox/V2Writable.pm

diff --combined MANIFEST
index ae637f24aa91f1d7e569e4f798348102d8bc462f,a750e09e0661b84ac94cb0e434b6424875a86c3d..3f0a79a671113b6f7722dcd81b67aa451015f7d9
+++ b/MANIFEST
@@@ -13,7 -13,6 +13,7 @@@ Documentation/public-inbox-compact.po
  Documentation/public-inbox-config.pod
  Documentation/public-inbox-convert.pod
  Documentation/public-inbox-daemon.pod
 +Documentation/public-inbox-edit.pod
  Documentation/public-inbox-httpd.pod
  Documentation/public-inbox-index.pod
  Documentation/public-inbox-mda.pod
@@@ -69,7 -68,6 +69,7 @@@ examples/unsubscribe.psg
  examples/varnish-4.vcl
  lib/PublicInbox/Address.pm
  lib/PublicInbox/Admin.pm
 +lib/PublicInbox/AdminEdit.pm
  lib/PublicInbox/AltId.pm
  lib/PublicInbox/Cgit.pm
  lib/PublicInbox/Config.pm
@@@ -151,7 -149,6 +151,7 @@@ sa_config/root/etc/spamassassin/public-
  sa_config/user/.spamassassin/user_prefs
  script/public-inbox-compact
  script/public-inbox-convert
 +script/public-inbox-edit
  script/public-inbox-httpd
  script/public-inbox-index
  script/public-inbox-init
@@@ -187,7 -184,6 +187,7 @@@ t/content_id.
  t/convert-compact.t
  t/data/0001.patch
  t/ds-leak.t
 +t/edit.t
  t/emergency.t
  t/fail-bin/spamc
  t/feed.t
@@@ -240,7 -236,6 +240,7 @@@ t/psgi_text.
  t/psgi_v2.t
  t/purge.t
  t/qspawn.t
 +t/replace.t
  t/reply.t
  t/search-thr-index.t
  t/search.t
@@@ -263,4 -258,4 +263,5 @@@ t/view.
  t/watch_filter_rubylang.t
  t/watch_maildir.t
  t/watch_maildir_v2.t
 +t/www_listing.t
+ t/xcpdb-reshard.t
index 09ed4e7b99ff096d194a9e59ff79a9569b745c0f,c504651730427424fa79d02a02313bcb71ce18d7..3329d79fa77b56690be7cd45bac12b9fa53674c6
@@@ -11,7 -11,7 +11,7 @@@ use PublicInbox::SearchIdxPart
  use PublicInbox::MIME;
  use PublicInbox::Git;
  use PublicInbox::Import;
 -use PublicInbox::MID qw(mids);
 +use PublicInbox::MID qw(mids references);
  use PublicInbox::ContentId qw(content_id content_digest);
  use PublicInbox::Inbox;
  use PublicInbox::OverIdx;
@@@ -23,7 -23,14 +23,14 @@@ use IO::Handle
  # an estimate of the post-packed size to the raw uncompressed size
  my $PACKING_FACTOR = 0.4;
  
- # assume 2 cores if GNU nproc(1) is not available
+ # SATA storage lags behind what CPUs are capable of, so relying on
+ # nproc(1) can be misleading and having extra Xapian partions is a
+ # waste of FDs and space.  It can also lead to excessive IO latency
+ # and slow things down.  Users on NVME or other fast storage can
+ # use the NPROC env or switches in our script/public-inbox-* programs
+ # to increase Xapian partitions.
+ our $NPROC_MAX_DEFAULT = 4;
  sub nproc_parts ($) {
        my ($creat_opt) = @_;
        if (ref($creat_opt) eq 'HASH') {
                }
        }
  
-       my $n = int($ENV{NPROC} || `nproc 2>/dev/null` || 2);
+       my $n = $ENV{NPROC};
+       if (!$n) {
+               chomp($n = `nproc 2>/dev/null`);
+               # assume 2 cores if GNU nproc(1) is not available
+               $n = 2 if !$n;
+               $n = $NPROC_MAX_DEFAULT if $NPROC_MAX_DEFAULT > 4;
+       }
        # subtract for the main process and git-fast-import
        $n -= 1;
        $n < 1 ? 1 : $n;
@@@ -116,18 -130,6 +130,18 @@@ sub add 
        });
  }
  
 +# indexes a message, returns true if checkpointing is needed
 +sub do_idx ($$$$$$$) {
 +      my ($self, $msgref, $mime, $len, $num, $oid, $mid0) = @_;
 +      $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
 +      my $npart = $self->{partitions};
 +      my $part = $num % $npart;
 +      my $idx = idx_part($self, $part);
 +      $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
 +      my $n = $self->{transact_bytes} += $len;
 +      $n >= (PublicInbox::SearchIdx::BATCH_BYTES * $npart);
 +}
 +
  sub _add {
        my ($self, $mime, $check_cb) = @_;
  
        $self->{last_commit}->[$self->{epoch_max}] = $cmt;
  
        my ($oid, $len, $msgref) = @{$im->{last_object}};
 -      $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
 -      my $nparts = $self->{partitions};
 -      my $part = $num % $nparts;
 -      my $idx = $self->idx_part($part);
 -      $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
 -      my $n = $self->{transact_bytes} += $len;
 -      if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
 +      if (do_idx($self, $msgref, $mime, $len, $num, $oid, $mid0)) {
                $self->checkpoint;
        }
  
@@@ -297,30 -305,26 +311,30 @@@ sub idx_init 
        });
  }
  
 -sub purge_oids ($$) {
 -      my ($self, $purge) = @_; # $purge = { $object_id => 1, ... }
 +# returns an array mapping [ epoch => latest_commit ]
 +# latest_commit may be undef if nothing was done to that epoch
 +# $replace_map = { $object_id => $strref, ... }
 +sub _replace_oids ($$$) {
 +      my ($self, $mime, $replace_map) = @_;
        $self->done;
        my $pfx = "$self->{-inbox}->{mainrepo}/git";
 -      my $purges = [];
 +      my $rewrites = []; # epoch => commit
        my $max = $self->{epoch_max};
  
        unless (defined($max)) {
                defined(my $latest = git_dir_latest($self, \$max)) or return;
                $self->{epoch_max} = $max;
        }
 +
        foreach my $i (0..$max) {
                my $git_dir = "$pfx/$i.git";
                -d $git_dir or next;
                my $git = PublicInbox::Git->new($git_dir);
                my $im = $self->import_init($git, 0, 1);
 -              $purges->[$i] = $im->purge_oids($purge);
 +              $rewrites->[$i] = $im->replace_oids($mime, $replace_map);
                $im->done;
        }
 -      $purges;
 +      $rewrites;
  }
  
  sub content_ids ($) {
@@@ -343,31 -347,25 +357,31 @@@ sub content_matches ($$) 
        0
  }
  
 -sub remove_internal ($$$$) {
 -      my ($self, $mime, $cmt_msg, $purge) = @_;
 +# used for removing or replacing (purging)
 +sub rewrite_internal ($$;$$$) {
 +      my ($self, $old_mime, $cmt_msg, $new_mime, $sref) = @_;
        $self->idx_init;
 -      my $im = $self->importer unless $purge;
 +      my ($im, $need_reindex, $replace_map);
 +      if ($sref) {
 +              $replace_map = {}; # oid => sref
 +              $need_reindex = [] if $new_mime;
 +      } else {
 +              $im = $self->importer;
 +      }
        my $over = $self->{over};
 -      my $cids = content_ids($mime);
 +      my $cids = content_ids($old_mime);
        my $parts = $self->{idx_parts};
 -      my $mm = $self->{mm};
        my $removed;
 -      my $mids = mids($mime->header_obj);
 +      my $mids = mids($old_mime->header_obj);
  
        # We avoid introducing new blobs into git since the raw content
        # can be slightly different, so we do not need the user-supplied
        # message now that we have the mids and content_id
 -      $mime = undef;
 +      $old_mime = undef;
        my $mark;
  
        foreach my $mid (@$mids) {
 -              my %gone;
 +              my %gone; # num => [ smsg, raw ]
                my ($id, $prev);
                while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
                        my $msg = get_blob($self, $smsg);
                }
                foreach my $num (keys %gone) {
                        my ($smsg, $orig) = @{$gone{$num}};
 -                      $mm->num_delete($num);
                        # $removed should only be set once assuming
                        # no bugs in our deduplication code:
                        $removed = $smsg;
                        my $oid = $smsg->{blob};
 -                      if ($purge) {
 -                              $purge->{$oid} = 1;
 +                      if ($replace_map) {
 +                              $replace_map->{$oid} = $sref;
                        } else {
                                ($mark, undef) = $im->remove($orig, $cmt_msg);
                        }
                        $orig = undef;
 +                      if ($need_reindex) { # ->replace
 +                              push @$need_reindex, $smsg;
 +                      } else { # ->purge or ->remove
 +                              $self->{mm}->num_delete($num);
 +                      }
                        unindex_oid_remote($self, $oid, $mid);
                }
        }
                my $cmt = $im->get_mark($mark);
                $self->{last_commit}->[$self->{epoch_max}] = $cmt;
        }
 -      if ($purge && scalar keys %$purge) {
 -              return purge_oids($self, $purge);
 +      if ($replace_map && scalar keys %$replace_map) {
 +              my $rewrites = _replace_oids($self, $new_mime, $replace_map);
 +              return { rewrites => $rewrites, need_reindex => $need_reindex };
        }
        $removed;
  }
  sub remove {
        my ($self, $mime, $cmt_msg) = @_;
        $self->{-inbox}->with_umask(sub {
 -              remove_internal($self, $mime, $cmt_msg, undef);
 +              rewrite_internal($self, $mime, $cmt_msg);
        });
  }
  
 +sub _replace ($$;$$) {
 +      my ($self, $old_mime, $new_mime, $sref) = @_;
 +      my $rewritten = $self->{-inbox}->with_umask(sub {
 +              rewrite_internal($self, $old_mime, undef, $new_mime, $sref);
 +      }) or return;
 +
 +      my $rewrites = $rewritten->{rewrites};
 +      # ->done is called if there are rewrites since we gc+prune from git
 +      $self->idx_init if @$rewrites;
 +
 +      for my $i (0..$#$rewrites) {
 +              defined(my $cmt = $rewrites->[$i]) or next;
 +              $self->{last_commit}->[$i] = $cmt;
 +      }
 +      $rewritten;
 +}
 +
  # public
  sub purge {
        my ($self, $mime) = @_;
 -      my $purges = $self->{-inbox}->with_umask(sub {
 -              remove_internal($self, $mime, undef, {});
 -      }) or return;
 -      $self->idx_init if @$purges; # ->done is called on purges
 -      for my $i (0..$#$purges) {
 -              defined(my $cmt = $purges->[$i]) or next;
 -              $self->{last_commit}->[$i] = $cmt;
 +      my $rewritten = _replace($self, $mime, undef, \'') or return;
 +      $rewritten->{rewrites}
 +}
 +
 +# returns the git object_id of $fh, does not write the object to FS
 +sub git_hash_raw ($$) {
 +      my ($self, $raw) = @_;
 +      # grab the expected OID we have to reindex:
 +      open my $tmp_fh, '+>', undef or die "failed to open tmp: $!";
 +      $tmp_fh->autoflush(1);
 +      print $tmp_fh $$raw or die "print \$tmp_fh: $!";
 +      sysseek($tmp_fh, 0, 0) or die "seek failed: $!";
 +
 +      my ($r, $w);
 +      pipe($r, $w) or die "failed to create pipe: $!";
 +      my $rdr = { 0 => fileno($tmp_fh), 1 => fileno($w) };
 +      my $git_dir = $self->{-inbox}->git->{git_dir};
 +      my $cmd = ['git', "--git-dir=$git_dir", qw(hash-object --stdin)];
 +      my $pid = spawn($cmd, undef, $rdr);
 +      close $w;
 +      local $/ = "\n";
 +      chomp(my $oid = <$r>);
 +      waitpid($pid, 0) == $pid or die "git hash-object did not finish";
 +      die "git hash-object failed: $?" if $?;
 +      $oid =~ /\A[a-f0-9]{40}\z/ or die "OID not expected: $oid";
 +      $oid;
 +}
 +
 +sub _check_mids_match ($$$) {
 +      my ($old_list, $new_list, $hdrs) = @_;
 +      my %old_mids = map { $_ => 1 } @$old_list;
 +      my %new_mids = map { $_ => 1 } @$new_list;
 +      my @old = keys %old_mids;
 +      my @new = keys %new_mids;
 +      my $err = "$hdrs may not be changed when replacing\n";
 +      die $err if scalar(@old) != scalar(@new);
 +      delete @new_mids{@old};
 +      delete @old_mids{@new};
 +      die $err if (scalar(keys %old_mids) || scalar(keys %new_mids));
 +}
 +
 +# Changing Message-IDs or References with ->replace isn't supported.
 +# The rules for dealing with messages with multiple or conflicting
 +# Message-IDs are pretty complex and rethreading hasn't been fully
 +# implemented, yet.
 +sub check_mids_match ($$) {
 +      my ($old_mime, $new_mime) = @_;
 +      my $old = $old_mime->header_obj;
 +      my $new = $new_mime->header_obj;
 +      _check_mids_match(mids($old), mids($new), 'Message-ID(s)');
 +      _check_mids_match(references($old), references($new),
 +                      'References/In-Reply-To');
 +}
 +
 +# public
 +sub replace ($$$) {
 +      my ($self, $old_mime, $new_mime) = @_;
 +
 +      check_mids_match($old_mime, $new_mime);
 +
 +      # mutt will always add Content-Length:, Status:, Lines: when editing
 +      PublicInbox::Import::drop_unwanted_headers($new_mime);
 +
 +      my $raw = $new_mime->as_string;
 +      my $expect_oid = git_hash_raw($self, \$raw);
 +      my $rewritten = _replace($self, $old_mime, $new_mime, \$raw) or return;
 +      my $need_reindex = $rewritten->{need_reindex};
 +
 +      # just in case we have bugs in deduplication code:
 +      my $n = scalar(@$need_reindex);
 +      if ($n > 1) {
 +              my $list = join(', ', map {
 +                                      "$_->{num}: <$_->{mid}>"
 +                              } @$need_reindex);
 +              warn <<"";
 +W: rewritten $n messages matching content of original message (expected: 1).
 +W: possible bug in public-inbox, NNTP article IDs and Message-IDs follow:
 +W: $list
 +
        }
 -      $purges;
 +
 +      # make sure we really got the OID:
 +      my ($oid, $type, $len) = $self->{-inbox}->git->check($expect_oid);
 +      $oid eq $expect_oid or die "BUG: $expect_oid not found after replace";
 +
 +      # don't leak FDs to Xapian:
 +      $self->{-inbox}->git->cleanup;
 +
 +      # reindex modified messages:
 +      for my $smsg (@$need_reindex) {
 +              my $num = $smsg->{num};
 +              my $mid0 = $smsg->{mid};
 +              do_idx($self, \$raw, $new_mime, $len, $num, $oid, $mid0);
 +      }
 +      $rewritten->{rewrites};
  }
  
  sub last_commit_part ($$;$) {
@@@ -896,8 -786,15 +910,8 @@@ sub reindex_oid ($$$$) 
        }
        $sync->{mm_tmp}->mid_delete($mid0) or
                die "failed to delete <$mid0> for article #$num\n";
 -
 -      $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
 -      my $nparts = $self->{partitions};
 -      my $part = $num % $nparts;
 -      my $idx = $self->idx_part($part);
 -      $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
 -      my $n = $self->{transact_bytes} += $len;
        $sync->{nr}++;
 -      if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
 +      if (do_idx($self, $msgref, $mime, $len, $num, $oid, $mid0)) {
                $git->cleanup;
                $sync->{mm_tmp}->atfork_prepare;
                $self->done; # release lock