]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
imap+nntp: share COMPRESS implementation
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index 03590850973e0589000b29884d2f812759d128ae..ed5182ae846029a5af0837060335c7d7d8a45066 100644 (file)
@@ -6,14 +6,15 @@
 package PublicInbox::V2Writable;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::Lock);
+use parent qw(PublicInbox::Lock PublicInbox::IPC);
 use PublicInbox::SearchIdxShard;
 use PublicInbox::IPC;
 use PublicInbox::Eml;
 use PublicInbox::Git;
 use PublicInbox::Import;
+use PublicInbox::MultiGit;
 use PublicInbox::MID qw(mids references);
-use PublicInbox::ContentHash qw(content_hash content_digest);
+use PublicInbox::ContentHash qw(content_hash content_digest git_sha);
 use PublicInbox::InboxWritable;
 use PublicInbox::OverIdx;
 use PublicInbox::Msgmap;
@@ -54,14 +55,14 @@ sub nproc_shards ($) {
 
 sub count_shards ($) {
        my ($self) = @_;
+       # always load existing shards in case core count changes:
+       # Also, shard count may change while -watch is running
        if (my $ibx = $self->{ibx}) {
-               # always load existing shards in case core count changes:
-               # Also, shard count may change while -watch is running
                my $srch = $ibx->search or return 0;
                delete $ibx->{search};
                $srch->{nshard} // 0
        } else { # ExtSearchIdx
-               $self->{nshard} ||= scalar($self->xdb_shards_flat);
+               $self->{nshard} = scalar($self->xdb_shards_flat);
        }
 }
 
@@ -72,16 +73,14 @@ sub new {
        $v2ibx = PublicInbox::InboxWritable->new($v2ibx);
        my $dir = $v2ibx->assert_usable_dir;
        unless (-d $dir) {
-               if ($creat) {
-                       require File::Path;
-                       File::Path::mkpath($dir);
-               } else {
-                       die "$dir does not exist\n";
-               }
+               die "$dir does not exist\n" if !$creat;
+               require File::Path;
+               File::Path::mkpath($dir);
        }
        my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION;
        my $self = {
                ibx => $v2ibx,
+               mg => PublicInbox::MultiGit->new($dir, 'all.git', 'git'),
                im => undef, #  PublicInbox::Import
                parallel => 1,
                transact_bytes => 0,
@@ -110,7 +109,7 @@ sub init_inbox {
        $self->{mm}->skip_artnum($skip_artnum) if defined $skip_artnum;
        my $max = $self->{ibx}->max_git_epoch;
        $max = $skip_epoch if (defined($skip_epoch) && !defined($max));
-       $self->git_init($max // 0);
+       $self->{mg}->add_epoch($max // 0);
        $self->done;
 }
 
@@ -268,9 +267,7 @@ sub _idx_init { # with_umask callback
 
        # Now that all subprocesses are up, we can open the FDs
        # for SQLite:
-       my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
-                               "$ibx->{inboxdir}/msgmap.sqlite3",
-                               $ibx->{-no_fsync} ? 2 : 1);
+       my $mm = $self->{mm} = PublicInbox::Msgmap->new_file($ibx, 1);
        $mm->{dbh}->begin_work;
 }
 
@@ -447,23 +444,6 @@ sub purge {
        $rewritten->{rewrites}
 }
 
-# returns the git object_id of $fh, does not write the object to FS
-sub git_hash_raw ($$) {
-       my ($self, $raw) = @_;
-       # grab the expected OID we have to reindex:
-       pipe(my($in, $w)) or die "pipe: $!";
-       my $git_dir = $self->git->{git_dir};
-       my $cmd = ['git', "--git-dir=$git_dir", qw(hash-object --stdin)];
-       my $r = popen_rd($cmd, undef, { 0 => $in });
-       print $w $$raw or die "print \$w: $!";
-       close $w or die "close \$w: $!";
-       local $/ = "\n";
-       chomp(my $oid = <$r>);
-       close $r or die "git hash-object failed: $?";
-       $oid =~ /\A$OID\z/ or die "OID not expected: $oid";
-       $oid;
-}
-
 sub _check_mids_match ($$$) {
        my ($old_list, $new_list, $hdrs) = @_;
        my %old_mids = map { $_ => 1 } @$old_list;
@@ -498,7 +478,7 @@ sub replace ($$$) {
        PublicInbox::Import::drop_unwanted_headers($new_mime);
 
        my $raw = $new_mime->as_string;
-       my $expect_oid = git_hash_raw($self, \$raw);
+       my $expect_oid = git_sha(1, \$raw)->hexdigest;
        my $rewritten = _replace($self, $old_mime, $new_mime, \$raw) or return;
        my $need_reindex = $rewritten->{need_reindex};
 
@@ -567,11 +547,11 @@ sub checkpoint ($;$) {
        }
        my $shards = $self->{idx_shards};
        if ($shards) {
-               my $mm = $self->{mm};
-               my $dbh = $mm->{dbh} if $mm;
+               my $dbh = $self->{mm}->{dbh} if $self->{mm};
 
                # SQLite msgmap data is second in importance
                $dbh->commit if $dbh;
+               eval { $dbh->do('PRAGMA optimize') };
 
                # SQLite overview is third
                $self->{oidx}->commit_lazy;
@@ -641,6 +621,11 @@ sub done {
                eval { $mm->{dbh}->$m };
                $err .= "msgmap $m: $@\n" if $@;
        }
+       if ($self->{oidx} && $self->{oidx}->{dbh} && $err) {
+               eval { $self->{oidx}->rollback_lazy };
+               $err .= "overview rollback: $@\n" if $@;
+       }
+
        my $shards = delete $self->{idx_shards};
        if ($shards) {
                for (@$shards) {
@@ -658,70 +643,6 @@ sub done {
        die $err if $err;
 }
 
-sub write_alternates ($$$) {
-       my ($info_dir, $mode, $out) = @_;
-       my $fh = File::Temp->new(TEMPLATE => 'alt-XXXXXXXX', DIR => $info_dir);
-       my $tmp = $fh->filename;
-       print $fh @$out or die "print $tmp: $!\n";
-       chmod($mode, $fh) or die "fchmod $tmp: $!\n";
-       close $fh or die "close $tmp $!\n";
-       my $alt = "$info_dir/alternates";
-       rename($tmp, $alt) or die "rename $tmp => $alt: $!\n";
-       $fh->unlink_on_destroy(0);
-}
-
-sub fill_alternates ($$) {
-       my ($self, $epoch) = @_;
-
-       my $pfx = "$self->{ibx}->{inboxdir}/git";
-       my $all = "$self->{ibx}->{inboxdir}/all.git";
-       PublicInbox::Import::init_bare($all) unless -d $all;
-       my $info_dir = "$all/objects/info";
-       my $alt = "$info_dir/alternates";
-       my (%alt, $new);
-       my $mode = 0644;
-       if (-e $alt) {
-               open(my $fh, '<', $alt) or die "open < $alt: $!\n";
-               $mode = (stat($fh))[2] & 07777;
-
-               # we assign a sort score to every alternate and favor
-               # the newest (highest numbered) one because loose objects
-               # require scanning epochs and only the latest epoch is
-               # expected to see loose objects
-               my $score;
-               my $other = 0; # in case admin adds non-epoch repos
-               %alt = map {;
-                       if (m!\A\Q../../\E([0-9]+)\.git/objects\z!) {
-                               $score = $1 + 0;
-                       } else {
-                               $score = --$other;
-                       }
-                       $_ => $score;
-               } split(/\n+/, do { local $/; <$fh> });
-       }
-
-       foreach my $i (0..$epoch) {
-               my $dir = "../../git/$i.git/objects";
-               if (!exists($alt{$dir}) && -d "$pfx/$i.git") {
-                       $alt{$dir} = $i;
-                       $new = 1;
-               }
-       }
-       return unless $new;
-       write_alternates($info_dir, $mode,
-               [join("\n", sort { $alt{$b} <=> $alt{$a} } keys %alt), "\n"]);
-}
-
-sub git_init {
-       my ($self, $epoch) = @_;
-       my $git_dir = "$self->{ibx}->{inboxdir}/git/$epoch.git";
-       PublicInbox::Import::init_bare($git_dir);
-       run_die([qw(git config), "--file=$git_dir/config",
-               qw(include.path ../../all.git/config)]);
-       fill_alternates($self, $epoch);
-       $git_dir
-}
-
 sub importer {
        my ($self) = @_;
        my $im = $self->{im};
@@ -733,8 +654,8 @@ sub importer {
                        $im->done;
                        $im = undef;
                        $self->checkpoint;
-                       my $git_dir = $self->git_init(++$self->{epoch_max});
-                       my $git = PublicInbox::Git->new($git_dir);
+                       my $dir = $self->{mg}->add_epoch(++$self->{epoch_max});
+                       my $git = PublicInbox::Git->new($dir);
                        return $self->import_init($git, 0);
                }
        }
@@ -754,8 +675,8 @@ sub importer {
                }
        }
        $self->{epoch_max} = $epoch;
-       $latest = $self->git_init($epoch);
-       $self->import_init(PublicInbox::Git->new($latest), 0);
+       my $dir = $self->{mg}->add_epoch($epoch);
+       $self->import_init(PublicInbox::Git->new($dir), 0);
 }
 
 sub import_init {
@@ -772,11 +693,11 @@ sub import_init {
 sub diff ($$$) {
        my ($mid, $cur, $new) = @_;
 
-       my $ah = File::Temp->new(TEMPLATE => 'email-cur-XXXXXXXX', TMPDIR => 1);
+       my $ah = File::Temp->new(TEMPLATE => 'email-cur-XXXX', TMPDIR => 1);
        print $ah $cur->as_string or die "print: $!";
        $ah->flush or die "flush: $!";
        PublicInbox::Import::drop_unwanted_headers($new);
-       my $bh = File::Temp->new(TEMPLATE => 'email-new-XXXXXXXX', TMPDIR => 1);
+       my $bh = File::Temp->new(TEMPLATE => 'email-new-XXXX', TMPDIR => 1);
        print $bh $new->as_string or die "print: $!";
        $bh->flush or die "flush: $!";
        my $cmd = [ qw(diff -u), $ah->filename, $bh->filename ];
@@ -892,8 +813,8 @@ sub index_oid { # cat_async callback
                        }
                }
        }
+       my $oidx = $self->{oidx};
        if (!defined($num)) { # reuse if reindexing (or duplicates)
-               my $oidx = $self->{oidx};
                for my $mid (@$mids) {
                        ($num, $mid0) = $oidx->num_mid0_for_oid($oid, $mid);
                        last if defined $num;
@@ -901,6 +822,11 @@ sub index_oid { # cat_async callback
        }
        $mid0 //= do { # is this a number we got before?
                $num = $arg->{mm_tmp}->num_for($mids->[0]);
+
+               # don't clobber existing if Message-ID is reused:
+               if (my $x = defined($num) ? $oidx->get_art($num) : undef) {
+                       undef($num) if $x->{blob} ne $oid;
+               }
                defined($num) ? $mids->[0] : undef;
        };
        if (!defined($num)) {
@@ -958,6 +884,11 @@ sub update_last_commit {
                chomp(my $n = $unit->{git}->qx(@cmd));
                return if $n ne '' && $n == 0;
        }
+       # don't rewind if --{since,until,before,after} are in use
+       return if (defined($last) &&
+                       grep(defined, @{$sync->{-opt}}{qw(since until)}) &&
+                       is_ancestor($self->git, $latest_cmt, $last));
+
        last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
 }
 
@@ -1108,7 +1039,7 @@ sub sync_prepare ($$) {
                        my $req = { %$sync, oid => $oid };
                        $self->git->cat_async($oid, $unindex_oid, $req);
                }
-               $self->git->cat_async_wait;
+               $self->git->async_wait_all;
        }
        return 0 if $sync->{quit};
        if (!$regen_max) {
@@ -1190,7 +1121,7 @@ sub unindex_todo ($$$) {
                $self->git->cat_async($1, $unindex_oid, { %$sync, oid => $1 });
        }
        close $fh or die "git log failed: \$?=$?";
-       $self->git->cat_async_wait;
+       $self->git->async_wait_all;
 
        return unless $sync->{-opt}->{prune};
        my $after = scalar keys %$unindexed;
@@ -1296,7 +1227,7 @@ sub index_todo ($$$) {
 
 sub xapian_only {
        my ($self, $opt, $sync, $art_beg) = @_;
-       my $seq = $opt->{sequential_shard};
+       my $seq = $opt->{'sequential-shard'};
        $art_beg //= 0;
        local $self->{parallel} = 0 if $seq;
        $self->idx_init($opt); # acquire lock
@@ -1322,7 +1253,8 @@ sub xapian_only {
                        index_xap_step($self, $sync, $art_beg, 1);
                }
        }
-       $self->git->cat_async_wait;
+       $self->git->async_wait_all;
+       $self->{ibx}->cleanup;
        $self->done;
 }
 
@@ -1345,13 +1277,13 @@ sub index_sync {
        }
 
        my $pr = $opt->{-progress};
-       my $seq = $opt->{sequential_shard};
+       my $seq = $opt->{'sequential-shard'};
        my $art_beg; # the NNTP article number we start xapian_only at
        my $idxlevel = $self->{ibx}->{indexlevel};
        local $self->{ibx}->{indexlevel} = 'basic' if $seq;
 
        $self->idx_init($opt); # acquire lock
-       fill_alternates($self, $epoch_max);
+       $self->{mg}->fill_alternates;
        $self->{oidx}->rethread_prepare($opt);
        my $sync = {
                need_checkpoint => \(my $bool = 0),
@@ -1418,7 +1350,8 @@ sub index_sync {
        }
 
        # reindex does not pick up new changes, so we rerun w/o it:
-       if ($opt->{reindex} && !$sync->{quit}) {
+       if ($opt->{reindex} && !$sync->{quit} &&
+                       !grep(defined, @$opt{qw(since until)})) {
                my %again = %$opt;
                $sync = undef;
                delete @again{qw(rethread reindex -skip_lock)};
@@ -1430,4 +1363,14 @@ W: interrupted, --xapian-only --reindex required upon restart
 EOF
 }
 
+sub ipc_atfork_child {
+       my ($self) = @_;
+       if (my $lei = delete $self->{lei}) {
+               $lei->_lei_atfork_child;
+               my $pkt_op_p = delete $lei->{pkt_op_p};
+               close($pkt_op_p->{op_p});
+       }
+       $self->SUPER::ipc_atfork_child;
+}
+
 1;