X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FV2Writable.pm;h=b51c85250a9b6be8794c30ea51879f1a2f5ffca1;hb=1e63e0409004f1edb352f53729e2d4aed4825a22;hp=0119ea7634b96166e7cda8eba96ba30612d9ac96;hpb=c6d0a81e6ca7a5da9f9ff193f0992705aa9f9682;p=public-inbox.git diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 0119ea76..b51c8525 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -147,10 +147,8 @@ sub init_inbox { # returns undef on duplicate or spam # mimics Import::add and wraps it for v2 sub add { - my ($self, $mime, $check_cb) = @_; - $self->{-inbox}->with_umask(sub { - _add($self, $mime, $check_cb) - }); + my ($self, $eml, $check_cb) = @_; + $self->{-inbox}->with_umask(\&_add, $self, $eml, $check_cb); } # indexes a message, returns true if checkpointing is needed @@ -276,6 +274,28 @@ sub idx_shard { $self->{idx_shards}->[$shard_i]; } +sub _idx_init { # with_umask callback + my ($self, $opt) = @_; + $self->lock_acquire unless $opt && $opt->{-skip_lock}; + $self->{over}->create; + + # xcpdb can change shard count while -watch is idle + my $nshards = count_shards($self); + $self->{shards} = $nshards if $nshards && $nshards != $self->{shards}; + + # need to create all shards before initializing msgmap FD + # idx_shards must be visible to all forked processes + my $max = $self->{shards} - 1; + my $idx = $self->{idx_shards} = []; + push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max); + + # Now that all subprocesses are up, we can open the FDs + # for SQLite: + my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( + "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1); + $mm->{dbh}->begin_work; +} + # idempotent sub idx_init { my ($self, $opt) = @_; @@ -285,13 +305,10 @@ sub idx_init { # do not leak read-only FDs to child processes, we only have these # FDs for duplicate detection so they should not be # frequently activated. + # delete @$ibx{qw(git mm search)}; delete $ibx->{$_} foreach (qw(git mm search)); - my $indexlevel = $ibx->{indexlevel}; - if ($indexlevel && $indexlevel eq 'basic') { - $self->{parallel} = 0; - } - + $self->{parallel} = 0 if ($ibx->{indexlevel}//'') eq 'basic'; if ($self->{parallel}) { pipe(my ($r, $w)) or die "pipe failed: $!"; # pipe for barrier notifications doesn't need to be big, @@ -301,33 +318,8 @@ sub idx_init { $w->autoflush(1); } - my $over = $self->{over}; $ibx->umask_prepare; - $ibx->with_umask(sub { - $self->lock_acquire unless ($opt && $opt->{-skip_lock}); - $over->create; - - # xcpdb can change shard count while -watch is idle - my $nshards = count_shards($self); - if ($nshards && $nshards != $self->{shards}) { - $self->{shards} = $nshards; - } - - # need to create all shards before initializing msgmap FD - my $max = $self->{shards} - 1; - - # idx_shards must be visible to all forked processes - my $idx = $self->{idx_shards} = []; - for my $i (0..$max) { - push @$idx, PublicInbox::SearchIdxShard->new($self, $i); - } - - # Now that all subprocesses are up, we can open the FDs - # for SQLite: - my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( - "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1); - $mm->{dbh}->begin_work; - }); + $ibx->with_umask(\&_idx_init, $self, $opt); } # returns an array mapping [ epoch => latest_commit ] @@ -379,24 +371,24 @@ sub content_matches ($$) { # used for removing or replacing (purging) sub rewrite_internal ($$;$$$) { - my ($self, $old_mime, $cmt_msg, $new_mime, $sref) = @_; + my ($self, $old_eml, $cmt_msg, $new_eml, $sref) = @_; $self->idx_init; my ($im, $need_reindex, $replace_map); if ($sref) { $replace_map = {}; # oid => sref - $need_reindex = [] if $new_mime; + $need_reindex = [] if $new_eml; } else { $im = $self->importer; } my $over = $self->{over}; - my $chashes = content_hashes($old_mime); - my @removed; - my $mids = mids($old_mime->header_obj); + my $chashes = content_hashes($old_eml); + my $removed = []; + my $mids = mids($old_eml->header_obj); # We avoid introducing new blobs into git since the raw content # can be slightly different, so we do not need the user-supplied # message now that we have the mids and content_hash - $old_mime = undef; + $old_eml = undef; my $mark; foreach my $mid (@$mids) { @@ -422,15 +414,15 @@ sub rewrite_internal ($$;$$$) { } foreach my $num (keys %gone) { my ($smsg, $mime, $orig) = @{$gone{$num}}; - # @removed should only be set once assuming + # $removed should only be set once assuming # no bugs in our deduplication code: - @removed = (undef, $mime, $smsg); + $removed = [ undef, $mime, $smsg ]; my $oid = $smsg->{blob}; if ($replace_map) { $replace_map->{$oid} = $sref; } else { ($mark, undef) = $im->remove($orig, $cmt_msg); - $removed[0] = $mark; + $removed->[0] = $mark; } $orig = undef; if ($need_reindex) { # ->replace @@ -447,28 +439,26 @@ sub rewrite_internal ($$;$$$) { $self->{last_commit}->[$self->{epoch_max}] = $cmt; } if ($replace_map && scalar keys %$replace_map) { - my $rewrites = _replace_oids($self, $new_mime, $replace_map); + my $rewrites = _replace_oids($self, $new_eml, $replace_map); return { rewrites => $rewrites, need_reindex => $need_reindex }; } - defined($mark) ? @removed : undef; + defined($mark) ? $removed : undef; } # public (see PublicInbox::Import->remove), but note the 3rd element # (retval[2]) is not part of the stable API shared with Import->remove sub remove { - my ($self, $mime, $cmt_msg) = @_; - my @ret; - $self->{-inbox}->with_umask(sub { - @ret = rewrite_internal($self, $mime, $cmt_msg); - }); - defined($ret[0]) ? @ret : undef; + my ($self, $eml, $cmt_msg) = @_; + my $r = $self->{-inbox}->with_umask(\&rewrite_internal, + $self, $eml, $cmt_msg); + defined($r) && defined($r->[0]) ? @$r: undef; } sub _replace ($$;$$) { - my ($self, $old_mime, $new_mime, $sref) = @_; - my $rewritten = $self->{-inbox}->with_umask(sub { - rewrite_internal($self, $old_mime, undef, $new_mime, $sref); - }) or return; + my ($self, $old_eml, $new_eml, $sref) = @_; + my $arg = [ $self, $old_eml, undef, $new_eml, $sref ]; + my $rewritten = $self->{-inbox}->with_umask(\&rewrite_internal, + $self, $old_eml, undef, $new_eml, $sref) or return; my $rewrites = $rewritten->{rewrites}; # ->done is called if there are rewrites since we gc+prune from git