use PublicInbox::MIME;
use PublicInbox::Git;
use PublicInbox::Import;
-use PublicInbox::MID qw(mids);
+use PublicInbox::MID qw(mids references);
use PublicInbox::ContentId qw(content_id content_digest);
use PublicInbox::Inbox;
use PublicInbox::OverIdx;
# an estimate of the post-packed size to the raw uncompressed size
my $PACKING_FACTOR = 0.4;
-# assume 2 cores if GNU nproc(1) is not available
+# SATA storage lags behind what CPUs are capable of, so relying on
+# nproc(1) can be misleading and having extra Xapian partions is a
+# waste of FDs and space. It can also lead to excessive IO latency
+# and slow things down. Users on NVME or other fast storage can
+# use the NPROC env or switches in our script/public-inbox-* programs
+# to increase Xapian partitions.
+our $NPROC_MAX_DEFAULT = 4;
+
sub nproc_parts ($) {
my ($creat_opt) = @_;
if (ref($creat_opt) eq 'HASH') {
}
}
- my $n = int($ENV{NPROC} || `nproc 2>/dev/null` || 2);
+ my $n = $ENV{NPROC};
+ if (!$n) {
+ chomp($n = `nproc 2>/dev/null`);
+ # assume 2 cores if GNU nproc(1) is not available
+ $n = 2 if !$n;
+ $n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT;
+ }
+
# subtract for the main process and git-fast-import
$n -= 1;
$n < 1 ? 1 : $n;
# due to -compact
if (-d $xpfx) {
foreach my $part (<$xpfx/*>) {
- -d $part && $part =~ m!/\d+\z! or next;
+ -d $part && $part =~ m!/[0-9]+\z! or next;
eval {
Search::Xapian::Database->new($part)->close;
$nparts++;
});
}
+# indexes a message, returns true if checkpointing is needed
+sub do_idx ($$$$$$$) {
+ my ($self, $msgref, $mime, $len, $num, $oid, $mid0) = @_;
+ $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
+ my $npart = $self->{partitions};
+ my $part = $num % $npart;
+ my $idx = idx_part($self, $part);
+ $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
+ my $n = $self->{transact_bytes} += $len;
+ $n >= (PublicInbox::SearchIdx::BATCH_BYTES * $npart);
+}
+
sub _add {
my ($self, $mime, $check_cb) = @_;
$self->{last_commit}->[$self->{epoch_max}] = $cmt;
my ($oid, $len, $msgref) = @{$im->{last_object}};
- $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
- my $nparts = $self->{partitions};
- my $part = $num % $nparts;
- my $idx = $self->idx_part($part);
- $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
- my $n = $self->{transact_bytes} += $len;
- if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
+ if (do_idx($self, $msgref, $mime, $len, $num, $oid, $mid0)) {
$self->checkpoint;
}
});
}
-sub purge_oids ($$) {
- my ($self, $purge) = @_; # $purge = { $object_id => 1, ... }
+# returns an array mapping [ epoch => latest_commit ]
+# latest_commit may be undef if nothing was done to that epoch
+# $replace_map = { $object_id => $strref, ... }
+sub _replace_oids ($$$) {
+ my ($self, $mime, $replace_map) = @_;
$self->done;
my $pfx = "$self->{-inbox}->{mainrepo}/git";
- my $purges = [];
+ my $rewrites = []; # epoch => commit
my $max = $self->{epoch_max};
unless (defined($max)) {
defined(my $latest = git_dir_latest($self, \$max)) or return;
$self->{epoch_max} = $max;
}
+
foreach my $i (0..$max) {
my $git_dir = "$pfx/$i.git";
-d $git_dir or next;
my $git = PublicInbox::Git->new($git_dir);
my $im = $self->import_init($git, 0, 1);
- $purges->[$i] = $im->purge_oids($purge);
+ $rewrites->[$i] = $im->replace_oids($mime, $replace_map);
$im->done;
}
- $purges;
+ $rewrites;
}
sub content_ids ($) {
0
}
-sub remove_internal ($$$$) {
- my ($self, $mime, $cmt_msg, $purge) = @_;
+# used for removing or replacing (purging)
+sub rewrite_internal ($$;$$$) {
+ my ($self, $old_mime, $cmt_msg, $new_mime, $sref) = @_;
$self->idx_init;
- my $im = $self->importer unless $purge;
+ my ($im, $need_reindex, $replace_map);
+ if ($sref) {
+ $replace_map = {}; # oid => sref
+ $need_reindex = [] if $new_mime;
+ } else {
+ $im = $self->importer;
+ }
my $over = $self->{over};
- my $cids = content_ids($mime);
+ my $cids = content_ids($old_mime);
my $parts = $self->{idx_parts};
- my $mm = $self->{mm};
my $removed;
- my $mids = mids($mime->header_obj);
+ my $mids = mids($old_mime->header_obj);
# We avoid introducing new blobs into git since the raw content
# can be slightly different, so we do not need the user-supplied
# message now that we have the mids and content_id
- $mime = undef;
+ $old_mime = undef;
my $mark;
foreach my $mid (@$mids) {
- my %gone;
+ my %gone; # num => [ smsg, raw ]
my ($id, $prev);
while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
my $msg = get_blob($self, $smsg);
}
foreach my $num (keys %gone) {
my ($smsg, $orig) = @{$gone{$num}};
- $mm->num_delete($num);
# $removed should only be set once assuming
# no bugs in our deduplication code:
$removed = $smsg;
my $oid = $smsg->{blob};
- if ($purge) {
- $purge->{$oid} = 1;
+ if ($replace_map) {
+ $replace_map->{$oid} = $sref;
} else {
($mark, undef) = $im->remove($orig, $cmt_msg);
}
$orig = undef;
+ if ($need_reindex) { # ->replace
+ push @$need_reindex, $smsg;
+ } else { # ->purge or ->remove
+ $self->{mm}->num_delete($num);
+ }
unindex_oid_remote($self, $oid, $mid);
}
}
my $cmt = $im->get_mark($mark);
$self->{last_commit}->[$self->{epoch_max}] = $cmt;
}
- if ($purge && scalar keys %$purge) {
- return purge_oids($self, $purge);
+ if ($replace_map && scalar keys %$replace_map) {
+ my $rewrites = _replace_oids($self, $new_mime, $replace_map);
+ return { rewrites => $rewrites, need_reindex => $need_reindex };
}
$removed;
}
sub remove {
my ($self, $mime, $cmt_msg) = @_;
$self->{-inbox}->with_umask(sub {
- remove_internal($self, $mime, $cmt_msg, undef);
+ rewrite_internal($self, $mime, $cmt_msg);
});
}
+sub _replace ($$;$$) {
+ my ($self, $old_mime, $new_mime, $sref) = @_;
+ my $rewritten = $self->{-inbox}->with_umask(sub {
+ rewrite_internal($self, $old_mime, undef, $new_mime, $sref);
+ }) or return;
+
+ my $rewrites = $rewritten->{rewrites};
+ # ->done is called if there are rewrites since we gc+prune from git
+ $self->idx_init if @$rewrites;
+
+ for my $i (0..$#$rewrites) {
+ defined(my $cmt = $rewrites->[$i]) or next;
+ $self->{last_commit}->[$i] = $cmt;
+ }
+ $rewritten;
+}
+
# public
sub purge {
my ($self, $mime) = @_;
- my $purges = $self->{-inbox}->with_umask(sub {
- remove_internal($self, $mime, undef, {});
- }) or return;
- $self->idx_init if @$purges; # ->done is called on purges
- for my $i (0..$#$purges) {
- defined(my $cmt = $purges->[$i]) or next;
- $self->{last_commit}->[$i] = $cmt;
+ my $rewritten = _replace($self, $mime, undef, \'') or return;
+ $rewritten->{rewrites}
+}
+
+# returns the git object_id of $fh, does not write the object to FS
+sub git_hash_raw ($$) {
+ my ($self, $raw) = @_;
+ # grab the expected OID we have to reindex:
+ open my $tmp_fh, '+>', undef or die "failed to open tmp: $!";
+ $tmp_fh->autoflush(1);
+ print $tmp_fh $$raw or die "print \$tmp_fh: $!";
+ sysseek($tmp_fh, 0, 0) or die "seek failed: $!";
+
+ my ($r, $w);
+ pipe($r, $w) or die "failed to create pipe: $!";
+ my $rdr = { 0 => fileno($tmp_fh), 1 => fileno($w) };
+ my $git_dir = $self->{-inbox}->git->{git_dir};
+ my $cmd = ['git', "--git-dir=$git_dir", qw(hash-object --stdin)];
+ my $pid = spawn($cmd, undef, $rdr);
+ close $w;
+ local $/ = "\n";
+ chomp(my $oid = <$r>);
+ waitpid($pid, 0) == $pid or die "git hash-object did not finish";
+ die "git hash-object failed: $?" if $?;
+ $oid =~ /\A[a-f0-9]{40}\z/ or die "OID not expected: $oid";
+ $oid;
+}
+
+sub _check_mids_match ($$$) {
+ my ($old_list, $new_list, $hdrs) = @_;
+ my %old_mids = map { $_ => 1 } @$old_list;
+ my %new_mids = map { $_ => 1 } @$new_list;
+ my @old = keys %old_mids;
+ my @new = keys %new_mids;
+ my $err = "$hdrs may not be changed when replacing\n";
+ die $err if scalar(@old) != scalar(@new);
+ delete @new_mids{@old};
+ delete @old_mids{@new};
+ die $err if (scalar(keys %old_mids) || scalar(keys %new_mids));
+}
+
+# Changing Message-IDs or References with ->replace isn't supported.
+# The rules for dealing with messages with multiple or conflicting
+# Message-IDs are pretty complex and rethreading hasn't been fully
+# implemented, yet.
+sub check_mids_match ($$) {
+ my ($old_mime, $new_mime) = @_;
+ my $old = $old_mime->header_obj;
+ my $new = $new_mime->header_obj;
+ _check_mids_match(mids($old), mids($new), 'Message-ID(s)');
+ _check_mids_match(references($old), references($new),
+ 'References/In-Reply-To');
+}
+
+# public
+sub replace ($$$) {
+ my ($self, $old_mime, $new_mime) = @_;
+
+ check_mids_match($old_mime, $new_mime);
+
+ # mutt will always add Content-Length:, Status:, Lines: when editing
+ PublicInbox::Import::drop_unwanted_headers($new_mime);
+
+ my $raw = $new_mime->as_string;
+ my $expect_oid = git_hash_raw($self, \$raw);
+ my $rewritten = _replace($self, $old_mime, $new_mime, \$raw) or return;
+ my $need_reindex = $rewritten->{need_reindex};
+
+ # just in case we have bugs in deduplication code:
+ my $n = scalar(@$need_reindex);
+ if ($n > 1) {
+ my $list = join(', ', map {
+ "$_->{num}: <$_->{mid}>"
+ } @$need_reindex);
+ warn <<"";
+W: rewritten $n messages matching content of original message (expected: 1).
+W: possible bug in public-inbox, NNTP article IDs and Message-IDs follow:
+W: $list
+
}
- $purges;
+
+ # make sure we really got the OID:
+ my ($oid, $type, $len) = $self->{-inbox}->git->check($expect_oid);
+ $oid eq $expect_oid or die "BUG: $expect_oid not found after replace";
+
+ # don't leak FDs to Xapian:
+ $self->{-inbox}->git->cleanup;
+
+ # reindex modified messages:
+ for my $smsg (@$need_reindex) {
+ my $num = $smsg->{num};
+ my $mid0 = $smsg->{mid};
+ do_idx($self, \$raw, $new_mime, $len, $num, $oid, $mid0);
+ }
+ $rewritten->{rewrites};
}
sub last_commit_part ($$;$) {
my $latest;
opendir my $dh, $pfx or die "opendir $pfx: $!\n";
while (defined(my $git_dir = readdir($dh))) {
- $git_dir =~ m!\A(\d+)\.git\z! or next;
+ $git_dir =~ m!\A([0-9]+)\.git\z! or next;
if ($1 > $$max) {
$$max = $1;
$latest = "$pfx/$git_dir";
}
$sync->{mm_tmp}->mid_delete($mid0) or
die "failed to delete <$mid0> for article #$num\n";
-
- $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
- my $nparts = $self->{partitions};
- my $part = $num % $nparts;
- my $idx = $self->idx_part($part);
- $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
- my $n = $self->{transact_bytes} += $len;
$sync->{nr}++;
- if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
+ if (do_idx($self, $msgref, $mime, $len, $num, $oid, $mid0)) {
$git->cleanup;
$sync->{mm_tmp}->atfork_prepare;
$self->done; # release lock
return $tip; # all of it
};
+ # fast equality check to avoid (v)fork+execve overhead
+ if ($cur eq $tip) {
+ $sync->{ranges}->[$i] = undef;
+ return;
+ }
+
my $range = "$cur..$tip";
$pr->("$i.git checking contiguity... ") if $pr;
if (is_ancestor($git, $cur, $tip)) { # common case