]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/V2Writable.pm
v2: mirrors don't clobber msgs w/ reused Message-IDs
[public-inbox.git] / lib / PublicInbox / V2Writable.pm
index 971b007b02d62e6c62ebe9ca6b536432e874e18e..ed5182ae846029a5af0837060335c7d7d8a45066 100644 (file)
@@ -267,9 +267,7 @@ sub _idx_init { # with_umask callback
 
        # Now that all subprocesses are up, we can open the FDs
        # for SQLite:
-       my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
-                               "$ibx->{inboxdir}/msgmap.sqlite3",
-                               $ibx->{-no_fsync} ? 2 : 1);
+       my $mm = $self->{mm} = PublicInbox::Msgmap->new_file($ibx, 1);
        $mm->{dbh}->begin_work;
 }
 
@@ -549,11 +547,11 @@ sub checkpoint ($;$) {
        }
        my $shards = $self->{idx_shards};
        if ($shards) {
-               my $mm = $self->{mm};
-               my $dbh = $mm->{dbh} if $mm;
+               my $dbh = $self->{mm}->{dbh} if $self->{mm};
 
                # SQLite msgmap data is second in importance
                $dbh->commit if $dbh;
+               eval { $dbh->do('PRAGMA optimize') };
 
                # SQLite overview is third
                $self->{oidx}->commit_lazy;
@@ -623,6 +621,11 @@ sub done {
                eval { $mm->{dbh}->$m };
                $err .= "msgmap $m: $@\n" if $@;
        }
+       if ($self->{oidx} && $self->{oidx}->{dbh} && $err) {
+               eval { $self->{oidx}->rollback_lazy };
+               $err .= "overview rollback: $@\n" if $@;
+       }
+
        my $shards = delete $self->{idx_shards};
        if ($shards) {
                for (@$shards) {
@@ -810,8 +813,8 @@ sub index_oid { # cat_async callback
                        }
                }
        }
+       my $oidx = $self->{oidx};
        if (!defined($num)) { # reuse if reindexing (or duplicates)
-               my $oidx = $self->{oidx};
                for my $mid (@$mids) {
                        ($num, $mid0) = $oidx->num_mid0_for_oid($oid, $mid);
                        last if defined $num;
@@ -819,6 +822,11 @@ sub index_oid { # cat_async callback
        }
        $mid0 //= do { # is this a number we got before?
                $num = $arg->{mm_tmp}->num_for($mids->[0]);
+
+               # don't clobber existing if Message-ID is reused:
+               if (my $x = defined($num) ? $oidx->get_art($num) : undef) {
+                       undef($num) if $x->{blob} ne $oid;
+               }
                defined($num) ? $mids->[0] : undef;
        };
        if (!defined($num)) {
@@ -876,6 +884,11 @@ sub update_last_commit {
                chomp(my $n = $unit->{git}->qx(@cmd));
                return if $n ne '' && $n == 0;
        }
+       # don't rewind if --{since,until,before,after} are in use
+       return if (defined($last) &&
+                       grep(defined, @{$sync->{-opt}}{qw(since until)}) &&
+                       is_ancestor($self->git, $latest_cmt, $last));
+
        last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
 }
 
@@ -1026,7 +1039,7 @@ sub sync_prepare ($$) {
                        my $req = { %$sync, oid => $oid };
                        $self->git->cat_async($oid, $unindex_oid, $req);
                }
-               $self->git->cat_async_wait;
+               $self->git->async_wait_all;
        }
        return 0 if $sync->{quit};
        if (!$regen_max) {
@@ -1108,7 +1121,7 @@ sub unindex_todo ($$$) {
                $self->git->cat_async($1, $unindex_oid, { %$sync, oid => $1 });
        }
        close $fh or die "git log failed: \$?=$?";
-       $self->git->cat_async_wait;
+       $self->git->async_wait_all;
 
        return unless $sync->{-opt}->{prune};
        my $after = scalar keys %$unindexed;
@@ -1240,7 +1253,7 @@ sub xapian_only {
                        index_xap_step($self, $sync, $art_beg, 1);
                }
        }
-       $self->git->cat_async_wait;
+       $self->git->async_wait_all;
        $self->{ibx}->cleanup;
        $self->done;
 }
@@ -1337,7 +1350,8 @@ sub index_sync {
        }
 
        # reindex does not pick up new changes, so we rerun w/o it:
-       if ($opt->{reindex} && !$sync->{quit}) {
+       if ($opt->{reindex} && !$sync->{quit} &&
+                       !grep(defined, @$opt{qw(since until)})) {
                my %again = %$opt;
                $sync = undef;
                delete @again{qw(rethread reindex -skip_lock)};