]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
Merge remote-tracking branch 'origin/reshard' into next
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index c504651730427424fa79d02a02313bcb71ce18d7..3329d79fa77b56690be7cd45bac12b9fa53674c6 100644 (file)
@@ -11,7 +11,7 @@ use PublicInbox::SearchIdxPart;
 use PublicInbox::MIME;
 use PublicInbox::Git;
 use PublicInbox::Import;
-use PublicInbox::MID qw(mids);
+use PublicInbox::MID qw(mids references);
 use PublicInbox::ContentId qw(content_id content_digest);
 use PublicInbox::Inbox;
 use PublicInbox::OverIdx;
@@ -130,6 +130,18 @@ sub add {
        });
 }
 
+# indexes a message, returns true if checkpointing is needed
+sub do_idx ($$$$$$$) {
+       my ($self, $msgref, $mime, $len, $num, $oid, $mid0) = @_;
+       $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
+       my $npart = $self->{partitions};
+       my $part = $num % $npart;
+       my $idx = idx_part($self, $part);
+       $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
+       my $n = $self->{transact_bytes} += $len;
+       $n >= (PublicInbox::SearchIdx::BATCH_BYTES * $npart);
+}
+
 sub _add {
        my ($self, $mime, $check_cb) = @_;
 
@@ -155,13 +167,7 @@ sub _add {
        $self->{last_commit}->[$self->{epoch_max}] = $cmt;
 
        my ($oid, $len, $msgref) = @{$im->{last_object}};
-       $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
-       my $nparts = $self->{partitions};
-       my $part = $num % $nparts;
-       my $idx = $self->idx_part($part);
-       $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
-       my $n = $self->{transact_bytes} += $len;
-       if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
+       if (do_idx($self, $msgref, $mime, $len, $num, $oid, $mid0)) {
                $self->checkpoint;
        }
 
@@ -305,26 +311,30 @@ sub idx_init {
        });
 }
 
-sub purge_oids ($$) {
-       my ($self, $purge) = @_; # $purge = { $object_id => 1, ... }
+# returns an array mapping [ epoch => latest_commit ]
+# latest_commit may be undef if nothing was done to that epoch
+# $replace_map = { $object_id => $strref, ... }
+sub _replace_oids ($$$) {
+       my ($self, $mime, $replace_map) = @_;
        $self->done;
        my $pfx = "$self->{-inbox}->{mainrepo}/git";
-       my $purges = [];
+       my $rewrites = []; # epoch => commit
        my $max = $self->{epoch_max};
 
        unless (defined($max)) {
                defined(my $latest = git_dir_latest($self, \$max)) or return;
                $self->{epoch_max} = $max;
        }
+
        foreach my $i (0..$max) {
                my $git_dir = "$pfx/$i.git";
                -d $git_dir or next;
                my $git = PublicInbox::Git->new($git_dir);
                my $im = $self->import_init($git, 0, 1);
-               $purges->[$i] = $im->purge_oids($purge);
+               $rewrites->[$i] = $im->replace_oids($mime, $replace_map);
                $im->done;
        }
-       $purges;
+       $rewrites;
 }
 
 sub content_ids ($) {
@@ -347,25 +357,31 @@ sub content_matches ($$) {
        0
 }
 
-sub remove_internal ($$$$) {
-       my ($self, $mime, $cmt_msg, $purge) = @_;
+# used for removing or replacing (purging)
+sub rewrite_internal ($$;$$$) {
+       my ($self, $old_mime, $cmt_msg, $new_mime, $sref) = @_;
        $self->idx_init;
-       my $im = $self->importer unless $purge;
+       my ($im, $need_reindex, $replace_map);
+       if ($sref) {
+               $replace_map = {}; # oid => sref
+               $need_reindex = [] if $new_mime;
+       } else {
+               $im = $self->importer;
+       }
        my $over = $self->{over};
-       my $cids = content_ids($mime);
+       my $cids = content_ids($old_mime);
        my $parts = $self->{idx_parts};
-       my $mm = $self->{mm};
        my $removed;
-       my $mids = mids($mime->header_obj);
+       my $mids = mids($old_mime->header_obj);
 
        # We avoid introducing new blobs into git since the raw content
        # can be slightly different, so we do not need the user-supplied
        # message now that we have the mids and content_id
-       $mime = undef;
+       $old_mime = undef;
        my $mark;
 
        foreach my $mid (@$mids) {
-               my %gone;
+               my %gone; # num => [ smsg, raw ]
                my ($id, $prev);
                while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
                        my $msg = get_blob($self, $smsg);
@@ -388,17 +404,21 @@ sub remove_internal ($$$$) {
                }
                foreach my $num (keys %gone) {
                        my ($smsg, $orig) = @{$gone{$num}};
-                       $mm->num_delete($num);
                        # $removed should only be set once assuming
                        # no bugs in our deduplication code:
                        $removed = $smsg;
                        my $oid = $smsg->{blob};
-                       if ($purge) {
-                               $purge->{$oid} = 1;
+                       if ($replace_map) {
+                               $replace_map->{$oid} = $sref;
                        } else {
                                ($mark, undef) = $im->remove($orig, $cmt_msg);
                        }
                        $orig = undef;
+                       if ($need_reindex) { # ->replace
+                               push @$need_reindex, $smsg;
+                       } else { # ->purge or ->remove
+                               $self->{mm}->num_delete($num);
+                       }
                        unindex_oid_remote($self, $oid, $mid);
                }
        }
@@ -407,8 +427,9 @@ sub remove_internal ($$$$) {
                my $cmt = $im->get_mark($mark);
                $self->{last_commit}->[$self->{epoch_max}] = $cmt;
        }
-       if ($purge && scalar keys %$purge) {
-               return purge_oids($self, $purge);
+       if ($replace_map && scalar keys %$replace_map) {
+               my $rewrites = _replace_oids($self, $new_mime, $replace_map);
+               return { rewrites => $rewrites, need_reindex => $need_reindex };
        }
        $removed;
 }
@@ -417,22 +438,125 @@ sub remove_internal ($$$$) {
 sub remove {
        my ($self, $mime, $cmt_msg) = @_;
        $self->{-inbox}->with_umask(sub {
-               remove_internal($self, $mime, $cmt_msg, undef);
+               rewrite_internal($self, $mime, $cmt_msg);
        });
 }
 
+sub _replace ($$;$$) {
+       my ($self, $old_mime, $new_mime, $sref) = @_;
+       my $rewritten = $self->{-inbox}->with_umask(sub {
+               rewrite_internal($self, $old_mime, undef, $new_mime, $sref);
+       }) or return;
+
+       my $rewrites = $rewritten->{rewrites};
+       # ->done is called if there are rewrites since we gc+prune from git
+       $self->idx_init if @$rewrites;
+
+       for my $i (0..$#$rewrites) {
+               defined(my $cmt = $rewrites->[$i]) or next;
+               $self->{last_commit}->[$i] = $cmt;
+       }
+       $rewritten;
+}
+
 # public
 sub purge {
        my ($self, $mime) = @_;
-       my $purges = $self->{-inbox}->with_umask(sub {
-               remove_internal($self, $mime, undef, {});
-       }) or return;
-       $self->idx_init if @$purges; # ->done is called on purges
-       for my $i (0..$#$purges) {
-               defined(my $cmt = $purges->[$i]) or next;
-               $self->{last_commit}->[$i] = $cmt;
+       my $rewritten = _replace($self, $mime, undef, \'') or return;
+       $rewritten->{rewrites}
+}
+
+# returns the git object_id of $fh, does not write the object to FS
+sub git_hash_raw ($$) {
+       my ($self, $raw) = @_;
+       # grab the expected OID we have to reindex:
+       open my $tmp_fh, '+>', undef or die "failed to open tmp: $!";
+       $tmp_fh->autoflush(1);
+       print $tmp_fh $$raw or die "print \$tmp_fh: $!";
+       sysseek($tmp_fh, 0, 0) or die "seek failed: $!";
+
+       my ($r, $w);
+       pipe($r, $w) or die "failed to create pipe: $!";
+       my $rdr = { 0 => fileno($tmp_fh), 1 => fileno($w) };
+       my $git_dir = $self->{-inbox}->git->{git_dir};
+       my $cmd = ['git', "--git-dir=$git_dir", qw(hash-object --stdin)];
+       my $pid = spawn($cmd, undef, $rdr);
+       close $w;
+       local $/ = "\n";
+       chomp(my $oid = <$r>);
+       waitpid($pid, 0) == $pid or die "git hash-object did not finish";
+       die "git hash-object failed: $?" if $?;
+       $oid =~ /\A[a-f0-9]{40}\z/ or die "OID not expected: $oid";
+       $oid;
+}
+
+sub _check_mids_match ($$$) {
+       my ($old_list, $new_list, $hdrs) = @_;
+       my %old_mids = map { $_ => 1 } @$old_list;
+       my %new_mids = map { $_ => 1 } @$new_list;
+       my @old = keys %old_mids;
+       my @new = keys %new_mids;
+       my $err = "$hdrs may not be changed when replacing\n";
+       die $err if scalar(@old) != scalar(@new);
+       delete @new_mids{@old};
+       delete @old_mids{@new};
+       die $err if (scalar(keys %old_mids) || scalar(keys %new_mids));
+}
+
+# Changing Message-IDs or References with ->replace isn't supported.
+# The rules for dealing with messages with multiple or conflicting
+# Message-IDs are pretty complex and rethreading hasn't been fully
+# implemented, yet.
+sub check_mids_match ($$) {
+       my ($old_mime, $new_mime) = @_;
+       my $old = $old_mime->header_obj;
+       my $new = $new_mime->header_obj;
+       _check_mids_match(mids($old), mids($new), 'Message-ID(s)');
+       _check_mids_match(references($old), references($new),
+                       'References/In-Reply-To');
+}
+
+# public
+sub replace ($$$) {
+       my ($self, $old_mime, $new_mime) = @_;
+
+       check_mids_match($old_mime, $new_mime);
+
+       # mutt will always add Content-Length:, Status:, Lines: when editing
+       PublicInbox::Import::drop_unwanted_headers($new_mime);
+
+       my $raw = $new_mime->as_string;
+       my $expect_oid = git_hash_raw($self, \$raw);
+       my $rewritten = _replace($self, $old_mime, $new_mime, \$raw) or return;
+       my $need_reindex = $rewritten->{need_reindex};
+
+       # just in case we have bugs in deduplication code:
+       my $n = scalar(@$need_reindex);
+       if ($n > 1) {
+               my $list = join(', ', map {
+                                       "$_->{num}: <$_->{mid}>"
+                               } @$need_reindex);
+               warn <<"";
+W: rewritten $n messages matching content of original message (expected: 1).
+W: possible bug in public-inbox, NNTP article IDs and Message-IDs follow:
+W: $list
+
+       }
+
+       # make sure we really got the OID:
+       my ($oid, $type, $len) = $self->{-inbox}->git->check($expect_oid);
+       $oid eq $expect_oid or die "BUG: $expect_oid not found after replace";
+
+       # don't leak FDs to Xapian:
+       $self->{-inbox}->git->cleanup;
+
+       # reindex modified messages:
+       for my $smsg (@$need_reindex) {
+               my $num = $smsg->{num};
+               my $mid0 = $smsg->{mid};
+               do_idx($self, \$raw, $new_mime, $len, $num, $oid, $mid0);
        }
-       $purges;
+       $rewritten->{rewrites};
 }
 
 sub last_commit_part ($$;$) {
@@ -786,15 +910,8 @@ sub reindex_oid ($$$$) {
        }
        $sync->{mm_tmp}->mid_delete($mid0) or
                die "failed to delete <$mid0> for article #$num\n";
-
-       $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
-       my $nparts = $self->{partitions};
-       my $part = $num % $nparts;
-       my $idx = $self->idx_part($part);
-       $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
-       my $n = $self->{transact_bytes} += $len;
        $sync->{nr}++;
-       if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
+       if (do_idx($self, $msgref, $mime, $len, $num, $oid, $mid0)) {
                $git->cleanup;
                $sync->{mm_tmp}->atfork_prepare;
                $self->done; # release lock