]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Xapcmd.pm
ca74ea0c4c09f9ed2df7013679e3ab0d185dac31
[public-inbox.git] / lib / PublicInbox / Xapcmd.pm
1 # Copyright (C) 2018-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 package PublicInbox::Xapcmd;
4 use strict;
5 use warnings;
6 use PublicInbox::Spawn qw(which spawn);
7 use PublicInbox::Over;
8 use File::Temp qw(tempdir);
9 use File::Path qw(remove_tree);
10
11 sub commit_changes ($$$) {
12         my ($im, $old, $new) = @_;
13         my @st = stat($old) or die "failed to stat($old): $!\n";
14
15         my $over = "$old/over.sqlite3";
16         if (-f $over) {
17                 $over = PublicInbox::Over->new($over);
18                 $over->connect->sqlite_backup_to_file("$new/over.sqlite3");
19         }
20         rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
21         chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
22         rename($new, $old) or die "rename $new => $old: $!\n";
23         $im->lock_release;
24         remove_tree("$old/old") or die "failed to remove $old/old: $!\n";
25 }
26
27 sub xspawn {
28         my ($cmd, $env, $opt) = @_;
29         if (ref($cmd->[0]) eq 'CODE') {
30                 my $cb = shift(@$cmd); # $cb = cpdb()
31                 defined(my $pid = fork) or die "fork: $!";
32                 return $pid if $pid > 0;
33                 eval { $cb->($cmd, $env, $opt) };
34                 die $@ if $@;
35                 exit 0;
36         } else {
37                 spawn($cmd, $env, $opt);
38         }
39 }
40
41 sub run {
42         my ($ibx, $cmd, $env, $opt) = @_;
43         $opt ||= {};
44         my $dir = $ibx->{mainrepo} or die "no mainrepo in inbox\n";
45         my $exe = $cmd->[0];
46         my $pfx = $exe;
47         if (ref($exe) eq 'CODE') {
48                 $pfx = 'CODE';
49                 require Search::Xapian::WritableDatabase;
50         } else {
51                 which($exe) or die "$exe not found in PATH\n";
52         }
53         $ibx->umask_prepare;
54         my $old = $ibx->search->xdir(1);
55         -d $old or die "$old does not exist\n";
56         my $new = tempdir("$pfx-XXXXXXXX", DIR => $dir);
57         my $v = $ibx->{version} || 1;
58         my @cmds;
59         if ($v == 1) {
60                 push @cmds, [@$cmd, $old, $new];
61         } else {
62                 opendir my $dh, $old or die "Failed to opendir $old: $!\n";
63                 while (defined(my $dn = readdir($dh))) {
64                         if ($dn =~ /\A\d+\z/) {
65                                 push @cmds, [@$cmd, "$old/$dn", "$new/$dn"];
66                         } elsif ($dn eq '.' || $dn eq '..') {
67                         } elsif ($dn =~ /\Aover\.sqlite3/) {
68                         } else {
69                                 warn "W: skipping unknown dir: $old/$dn\n"
70                         }
71                 }
72                 die "No Xapian parts found in $old\n" unless @cmds;
73         }
74         my $im = $ibx->importer(0);
75         my $max = $opt->{jobs} || scalar(@cmds);
76         $ibx->with_umask(sub {
77                 $im->lock_acquire;
78                 my %pids;
79                 while (@cmds) {
80                         while (scalar(keys(%pids)) < $max && scalar(@cmds)) {
81                                 my $x = shift @cmds;
82                                 $pids{xspawn($x, $env, $opt)} = $x;
83                         }
84
85                         while (scalar keys %pids) {
86                                 my $pid = waitpid(-1, 0);
87                                 my $x = delete $pids{$pid};
88                                 die join(' ', @$x)." failed: $?\n" if $?;
89                         }
90                 }
91                 commit_changes($im, $old, $new);
92         });
93 }
94
95 sub cpdb_retryable ($$) {
96         my ($src, $err) = @_;
97         if (ref($err) eq 'Search::Xapian::DatabaseModifiedError') {
98                 warn "$err, reopening and retrying\n";
99                 $src->reopen;
100                 return 1;
101         }
102         die $err if $err;
103         0;
104 }
105
106 sub cpdb {
107         my ($args, $env, $opt) = @_;
108         my ($old, $new) = @$args;
109         my $src = Search::Xapian::Database->new($old);
110
111         # like copydatabase(1), be sure we don't overwrite anything in case
112         # of other bugs:
113         my $creat = Search::Xapian::DB_CREATE();
114         my $dst = Search::Xapian::WritableDatabase->new($new, $creat);
115         my ($it, $end);
116
117         do {
118                 eval {
119                         # update the only metadata key for v1:
120                         my $lc = $src->get_metadata('last_commit');
121                         $dst->set_metadata('last_commit', $lc) if $lc;
122
123                         $it = $src->postlist_begin('');
124                         $end = $src->postlist_end('');
125                 };
126         } while (cpdb_retryable($src, $@));
127
128         do {
129                 eval {
130                         while ($it != $end) {
131                                 my $docid = $it->get_docid;
132                                 my $doc = $src->get_document($docid);
133                                 $dst->replace_document($docid, $doc);
134                                 $it->inc;
135                         }
136
137                         # unlike copydatabase(1), we don't copy spelling
138                         # and synonym data (or other user metadata) since
139                         # the Perl APIs don't expose iterators for them
140                         # (and public-inbox does not use those features)
141                 };
142         } while (cpdb_retryable($src, $@));
143 }
144
145 1;