]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
favor readline() and print() as functions
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index b45d272263b33b72062a054cc23dea7593c558fd..513e9f230117893ec9f9d7e381bfc05d1baf1a82 100644 (file)
@@ -9,17 +9,18 @@ use warnings;
 use base qw(PublicInbox::Lock);
 use 5.010_001;
 use PublicInbox::SearchIdxShard;
-use PublicInbox::MIME;
+use PublicInbox::Eml;
 use PublicInbox::Git;
 use PublicInbox::Import;
 use PublicInbox::MID qw(mids references);
-use PublicInbox::ContentId qw(content_id content_digest);
+use PublicInbox::ContentHash qw(content_hash content_digest);
 use PublicInbox::Inbox;
 use PublicInbox::OverIdx;
 use PublicInbox::Msgmap;
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::SearchIdx;
 use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
+use PublicInbox::MultiMidQueue;
 use IO::Handle; # ->autoflush
 use File::Temp qw(tempfile);
 
@@ -119,6 +120,7 @@ sub new {
                last_commit => [], # git repo -> commit
        };
        $self->{shards} = count_shards($self) || nproc_shards($creat);
+       $self->{index_max_size} = $v2ibx->{index_max_size};
        bless $self, $class;
 }
 
@@ -157,7 +159,7 @@ sub do_idx ($$$$) {
        my $idx = idx_shard($self, $smsg->{num} % $self->{shards});
        $idx->index_raw($msgref, $mime, $smsg);
        my $n = $self->{transact_bytes} += $smsg->{bytes};
-       $n >= (PublicInbox::SearchIdx::BATCH_BYTES * $self->{shards});
+       $n >= ($PublicInbox::SearchIdx::BATCH_BYTES * $self->{shards});
 }
 
 sub _add {
@@ -351,22 +353,23 @@ sub _replace_oids ($$$) {
        $rewrites;
 }
 
-sub content_ids ($) {
+sub content_hashes ($) {
        my ($mime) = @_;
-       my @cids = ( content_id($mime) );
+       my @chashes = ( content_hash($mime) );
 
+       # We still support Email::MIME, here, and
        # Email::MIME->as_string doesn't always round-trip, so we may
-       # use a second content_id
-       my $rt = content_id(PublicInbox::MIME->new(\($mime->as_string)));
-       push @cids, $rt if $cids[0] ne $rt;
-       \@cids;
+       # use a second content_hash
+       my $rt = content_hash(PublicInbox::Eml->new(\($mime->as_string)));
+       push @chashes, $rt if $chashes[0] ne $rt;
+       \@chashes;
 }
 
 sub content_matches ($$) {
-       my ($cids, $existing) = @_;
-       my $cid = content_id($existing);
-       foreach (@$cids) {
-               return 1 if $_ eq $cid
+       my ($chashes, $existing) = @_;
+       my $chash = content_hash($existing);
+       foreach (@$chashes) {
+               return 1 if $_ eq $chash
        }
        0
 }
@@ -383,13 +386,13 @@ sub rewrite_internal ($$;$$$) {
                $im = $self->importer;
        }
        my $over = $self->{over};
-       my $cids = content_ids($old_mime);
+       my $chashes = content_hashes($old_mime);
        my @removed;
        my $mids = mids($old_mime->header_obj);
 
        # We avoid introducing new blobs into git since the raw content
        # can be slightly different, so we do not need the user-supplied
-       # message now that we have the mids and content_id
+       # message now that we have the mids and content_hash
        $old_mime = undef;
        my $mark;
 
@@ -403,8 +406,8 @@ sub rewrite_internal ($$;$$$) {
                                next; # continue
                        }
                        my $orig = $$msg;
-                       my $cur = PublicInbox::MIME->new($msg);
-                       if (content_matches($cids, $cur)) {
+                       my $cur = PublicInbox::Eml->new($msg);
+                       if (content_matches($chashes, $cur)) {
                                $gone{$smsg->{num}} = [ $smsg, $cur, \$orig ];
                        }
                }
@@ -603,7 +606,7 @@ sub barrier_wait {
        my $bnote = $self->{bnote} or return;
        my $r = $bnote->[0];
        while (scalar keys %$barrier) {
-               defined(my $l = $r->getline) or die "EOF on barrier_wait: $!";
+               defined(my $l = readline($r)) or die "EOF on barrier_wait: $!";
                $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l";
                delete $barrier->{$1} or die "bad shard[$1] on barrier wait";
        }
@@ -729,9 +732,8 @@ sub fill_alternates ($$) {
 sub git_init {
        my ($self, $epoch) = @_;
        my $git_dir = "$self->{-inbox}->{inboxdir}/git/$epoch.git";
-       my @cmd = (qw(git init --bare -q), $git_dir);
-       PublicInbox::Import::run_die(\@cmd);
-       @cmd = (qw/git config/, "--file=$git_dir/config",
+       PublicInbox::Import::init_bare($git_dir);
+       my @cmd = (qw/git config/, "--file=$git_dir/config",
                        'include.path', '../../all.git/config');
        PublicInbox::Import::run_die(\@cmd);
        fill_alternates($self, $epoch);
@@ -833,7 +835,7 @@ sub get_blob ($$) {
 sub content_exists ($$$) {
        my ($self, $mime, $mid) = @_;
        my $over = $self->{over};
-       my $cids = content_ids($mime);
+       my $chashes = content_hashes($mime);
        my ($id, $prev);
        while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
                my $msg = get_blob($self, $smsg);
@@ -841,8 +843,8 @@ sub content_exists ($$$) {
                        warn "broken smsg for $mid\n";
                        next;
                }
-               my $cur = PublicInbox::MIME->new($msg);
-               return 1 if content_matches($cids, $cur);
+               my $cur = PublicInbox::Eml->new($msg);
+               return 1 if content_matches($chashes, $cur);
 
                # XXX DEBUG_DIFF is experimental and may be removed
                diff($mid, $cur, $mime) if $ENV{DEBUG_DIFF};
@@ -867,12 +869,13 @@ sub atfork_child {
 
 sub mark_deleted ($$$$) {
        my ($self, $sync, $git, $oid) = @_;
+       return if PublicInbox::SearchIdx::too_big($self, $git, $oid);
        my $msgref = $git->cat_file($oid);
-       my $mime = PublicInbox::MIME->new($$msgref);
+       my $mime = PublicInbox::Eml->new($$msgref);
        my $mids = mids($mime->header_obj);
-       my $cid = content_id($mime);
+       my $chash = content_hash($mime);
        foreach my $mid (@$mids) {
-               $sync->{D}->{"$mid\0$cid"} = $oid;
+               $sync->{D}->{"$mid\0$chash"} = $oid;
        }
 }
 
@@ -899,13 +902,13 @@ sub reindex_oid_m ($$$$;$) {
        $self->{current_info} = "multi_mid $oid";
        my ($num, $mid0, $len);
        my $msgref = $git->cat_file($oid, \$len);
-       my $mime = PublicInbox::MIME->new($$msgref);
+       my $mime = PublicInbox::Eml->new($$msgref);
        my $mids = mids($mime->header_obj);
-       my $cid = content_id($mime);
+       my $chash = content_hash($mime);
        die "BUG: reindex_oid_m called for <=1 mids" if scalar(@$mids) <= 1;
 
        for my $mid (reverse @$mids) {
-               delete($sync->{D}->{"$mid\0$cid"}) and
+               delete($sync->{D}->{"$mid\0$chash"}) and
                        die "BUG: reindex_oid should handle <$mid> delete";
        }
        my $over = $self->{over};
@@ -979,38 +982,27 @@ sub check_unindexed ($$$) {
        }
 }
 
-# reuse Msgmap to store num => oid mapping (rather than num => mid)
-sub multi_mid_q_new () {
-       my ($fh, $fn) = tempfile('multi_mid-XXXXXXX', EXLOCK => 0, TMPDIR => 1);
-       my $multi_mid = PublicInbox::Msgmap->new_file($fn, 1);
-       $multi_mid->{dbh}->do('PRAGMA synchronous = OFF');
-       # for Msgmap->DESTROY:
-       $multi_mid->{tmp_name} = $fn;
-       $multi_mid->{pid} = $$;
-       close $fh or die "failed to close $fn: $!";
-       $multi_mid
-}
-
-sub multi_mid_q_push ($$) {
-       my ($sync, $oid) = @_;
-       my $multi_mid = $sync->{multi_mid} //= multi_mid_q_new();
+sub multi_mid_q_push ($$$) {
+       my ($self, $sync, $oid) = @_;
+       my $multi_mid = $sync->{multi_mid} //= PublicInbox::MultiMidQueue->new;
        if ($sync->{reindex}) { # no regen on reindex
-               $multi_mid->mid_insert($oid);
+               $multi_mid->push_oid($oid, $self);
        } else {
                my $num = $sync->{regen}--;
                die "BUG: ran out of article numbers" if $num <= 0;
-               $multi_mid->mid_set($num, $oid);
+               $multi_mid->set_oid($num, $oid, $self);
        }
 }
 
 sub reindex_oid ($$$$) {
        my ($self, $sync, $git, $oid) = @_;
+       return if PublicInbox::SearchIdx::too_big($self, $git, $oid);
        my ($num, $mid0, $len);
        my $msgref = $git->cat_file($oid, \$len);
        return if $len == 0; # purged
-       my $mime = PublicInbox::MIME->new($$msgref);
+       my $mime = PublicInbox::Eml->new($$msgref);
        my $mids = mids($mime->header_obj);
-       my $cid = content_id($mime);
+       my $chash = content_hash($mime);
 
        if (scalar(@$mids) == 0) {
                warn "E: $oid has no Message-ID, skipping\n";
@@ -1019,7 +1011,7 @@ sub reindex_oid ($$$$) {
                my $mid = $mids->[0];
 
                # was the file previously marked as deleted?, skip if so
-               if (delete($sync->{D}->{"$mid\0$cid"})) {
+               if (delete($sync->{D}->{"$mid\0$chash"})) {
                        if (!$sync->{reindex}) {
                                $num = $sync->{regen}--;
                                $self->{mm}->num_highwater($num);
@@ -1044,14 +1036,14 @@ sub reindex_oid ($$$$) {
        } else { # multiple MIDs are a weird case:
                my $del = 0;
                for (@$mids) {
-                       $del += delete($sync->{D}->{"$_\0$cid"}) // 0;
+                       $del += delete($sync->{D}->{"$_\0$chash"}) // 0;
                }
                if ($del) {
                        unindex_oid_remote($self, $oid, $_) for @$mids;
                        # do not delete from {mm_tmp}, since another
                        # single-MID message may use it.
                } else { # handle them at the end:
-                       multi_mid_q_push($sync, $oid);
+                       multi_mid_q_push($self, $sync, $oid);
                }
                return;
        }
@@ -1202,7 +1194,7 @@ sub unindex_oid ($$$;$) {
        my ($self, $git, $oid, $unindexed) = @_;
        my $mm = $self->{mm};
        my $msgref = $git->cat_file($oid);
-       my $mime = PublicInbox::MIME->new($msgref);
+       my $mime = PublicInbox::Eml->new($msgref);
        my $mids = mids($mime->header_obj);
        $mime = $msgref = undef;
        my $over = $self->{over};
@@ -1317,7 +1309,7 @@ sub index_sync {
        return unless defined $latest;
        $self->idx_init($opt); # acquire lock
        my $sync = {
-               D => {}, # "$mid\0$cid" => $oid
+               D => {}, # "$mid\0$chash" => $oid
                unindex_range => {}, # EPOCH => oid_old..oid_new
                reindex => $opt->{reindex},
                -opt => $opt
@@ -1352,19 +1344,21 @@ sub index_sync {
        }
        if (my $multi_mid = delete $sync->{multi_mid}) {
                $git //= $self->{-inbox}->git;
-               my ($min, $max) = $multi_mid->minmax;
+               my $min = $multi_mid->{min};
+               my $max = $multi_mid->{max};
                if ($sync->{reindex}) {
                        # we may need to create new Message-IDs if mirrors
                        # were initially indexed with old versions
                        for (my $i = $max; $i >= $min; $i--) {
-                               my $oid = $multi_mid->mid_for($i);
+                               my $oid;
+                               $oid = $multi_mid->get_oid($i, $self) or next;
                                next unless defined $oid;
                                reindex_oid_m($self, $sync, $git, $oid);
                        }
                } else { # regen on initial index
                        for my $num ($min..$max) {
-                               my $oid = $multi_mid->mid_for($num);
-                               next unless defined $oid;
+                               my $oid;
+                               $oid = $multi_mid->get_oid($num, $self) or next;
                                reindex_oid_m($self, $sync, $git, $oid, $num);
                        }
                }