]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Xapcmd.pm
xcpdb: cleanup error handling and diagnosis
[public-inbox.git] / lib / PublicInbox / Xapcmd.pm
1 # Copyright (C) 2018-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 package PublicInbox::Xapcmd;
4 use strict;
5 use warnings;
6 use PublicInbox::Spawn qw(which spawn);
7 use PublicInbox::Over;
8 use PublicInbox::Search;
9 use File::Temp qw(tempdir);
10 use File::Path qw(remove_tree);
11
12 # support testing with dev versions of Xapian which installs
13 # commands with a version number suffix (e.g. "xapian-compact-1.5")
14 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
15
16 sub commit_changes ($$$$) {
17         my ($ibx, $old, $new, $opt) = @_;
18
19         my $reindex = $opt->{reindex};
20         my $im = $ibx->importer(0);
21         $im->lock_acquire if $reindex;
22
23         my @st = stat($old) or die "failed to stat($old): $!\n";
24
25         my $over = "$old/over.sqlite3";
26         if (-f $over) {
27                 $over = PublicInbox::Over->new($over);
28                 $over->connect->sqlite_backup_to_file("$new/over.sqlite3");
29                 $over = undef;
30         }
31         rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
32         chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
33         rename($new, $old) or die "rename $new => $old: $!\n";
34         remove_tree("$old/old") or die "failed to remove $old/old: $!\n";
35
36         if ($reindex) {
37                 $opt->{-skip_lock} = 1;
38                 PublicInbox::Admin::index_inbox($ibx, $opt);
39                 # implicit lock_release
40         } else {
41                 $im->lock_release;
42         }
43 }
44
45 sub xspawn {
46         my ($cmd, $env, $opt) = @_;
47         if (ref($cmd->[0]) eq 'CODE') {
48                 my $cb = shift(@$cmd); # $cb = cpdb()
49                 defined(my $pid = fork) or die "fork: $!";
50                 return $pid if $pid > 0;
51                 eval { $cb->($cmd, $env, $opt) };
52                 die $@ if $@;
53                 exit 0;
54         } else {
55                 spawn($cmd, $env, $opt);
56         }
57 }
58
59 sub runnable_or_die ($) {
60         my ($exe) = @_;
61         which($exe) or die "$exe not found in PATH\n";
62 }
63
64 sub prepare_reindex ($$) {
65         my ($ibx, $reindex) = @_;
66         if ($ibx->{version} == 1) {
67                 my $dir = $ibx->search->xdir(1);
68                 my $xdb = Search::Xapian::Database->new($dir);
69                 if (my $lc = $xdb->get_metadata('last_commit')) {
70                         $reindex->{from} = $lc;
71                 }
72         } else { # v2
73                 my $v2w = $ibx->importer(0);
74                 my $max;
75                 $v2w->git_dir_latest(\$max) or return;
76                 my $from = $reindex->{from};
77                 my $mm = $ibx->mm;
78                 my $v = PublicInbox::Search::SCHEMA_VERSION();
79                 foreach my $i (0..$max) {
80                         $from->[$i] = $mm->last_commit_xap($v, $i);
81                 }
82         }
83 }
84
85 sub progress_prepare ($) {
86         my ($opt) = @_;
87         if ($opt->{quiet}) {
88                 open my $null, '>', '/dev/null' or
89                         die "failed to open /dev/null: $!\n";
90                 $opt->{1} = fileno($null);
91                 $opt->{-dev_null} = $null;
92         } else {
93                 $opt->{-progress} = 1;
94         }
95 }
96
97 sub run {
98         my ($ibx, $cmd, $env, $opt) = @_;
99         progress_prepare($opt ||= {});
100         my $dir = $ibx->{mainrepo} or die "no mainrepo in inbox\n";
101         my $exe = $cmd->[0];
102         my $pfx = $exe;
103         runnable_or_die($XAPIAN_COMPACT) if $opt->{compact};
104
105         my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } }
106         my $from; # per-epoch ranges
107
108         if (ref($exe) eq 'CODE') {
109                 $pfx = 'CODE';
110                 $reindex = $opt->{reindex} = {};
111                 $from = $reindex->{from} = [];
112                 require Search::Xapian::WritableDatabase;
113         } else {
114                 runnable_or_die($exe);
115         }
116         $ibx->umask_prepare;
117         my $old = $ibx->search->xdir(1);
118         -d $old or die "$old does not exist\n";
119         my $new = tempdir("$pfx-XXXXXXXX", DIR => $dir);
120         my $v = $ibx->{version} ||= 1;
121         my @cmds;
122         if ($v == 1) {
123                 push @cmds, [@$cmd, $old, $new];
124         } else {
125                 opendir my $dh, $old or die "Failed to opendir $old: $!\n";
126                 while (defined(my $dn = readdir($dh))) {
127                         if ($dn =~ /\A\d+\z/) {
128                                 push @cmds, [@$cmd, "$old/$dn", "$new/$dn"];
129                         } elsif ($dn eq '.' || $dn eq '..') {
130                         } elsif ($dn =~ /\Aover\.sqlite3/) {
131                         } else {
132                                 warn "W: skipping unknown dir: $old/$dn\n"
133                         }
134                 }
135                 die "No Xapian parts found in $old\n" unless @cmds;
136         }
137         my $im = $ibx->importer(0);
138         my $max = $opt->{jobs} || scalar(@cmds);
139         $ibx->with_umask(sub {
140                 $im->lock_acquire;
141
142                 # fine-grained locking if we prepare for reindex
143                 if ($reindex) {
144                         prepare_reindex($ibx, $reindex);
145                         $im->lock_release;
146                 }
147                 delete($ibx->{$_}) for (qw(mm over search)); # cleanup
148                 my %pids;
149                 while (@cmds) {
150                         while (scalar(keys(%pids)) < $max && scalar(@cmds)) {
151                                 my $x = shift @cmds;
152                                 $pids{xspawn($x, $env, $opt)} = $x;
153                         }
154
155                         while (scalar keys %pids) {
156                                 my $pid = waitpid(-1, 0);
157                                 my $x = delete $pids{$pid};
158                                 die join(' ', @$x)." failed: $?\n" if $?;
159                         }
160                 }
161                 commit_changes($ibx, $old, $new, $opt);
162         });
163 }
164
165 sub cpdb_retryable ($$) {
166         my ($src, $pfx) = @_;
167         if (ref($@) eq 'Search::Xapian::DatabaseModifiedError') {
168                 warn "$pfx Xapian DB modified, reopening and retrying\n";
169                 $src->reopen;
170                 return 1;
171         }
172         if ($@) {
173                 warn "$pfx E: ", ref($@), "\n";
174                 die;
175         }
176         0;
177 }
178
179 # Like copydatabase(1), this is horribly slow; and it doesn't seem due
180 # to the overhead of Perl.
181 sub cpdb {
182         my ($args, $env, $opt) = @_;
183         my ($old, $new) = @$args;
184         my $src = Search::Xapian::Database->new($old);
185         my $tmp = $opt->{compact} ? "$new.compact" : $new;
186
187         # like copydatabase(1), be sure we don't overwrite anything in case
188         # of other bugs:
189         my $creat = Search::Xapian::DB_CREATE();
190         my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat);
191         my ($it, $end);
192         my $pfx = '';
193         my ($nr, $tot, $fmt); # progress output
194
195         do {
196                 eval {
197                         # update the only metadata key for v1:
198                         my $lc = $src->get_metadata('last_commit');
199                         $dst->set_metadata('last_commit', $lc) if $lc;
200
201                         $it = $src->postlist_begin('');
202                         $end = $src->postlist_end('');
203                         $pfx = (split('/', $old))[-1].':';
204                         if ($opt->{-progress}) {
205                                 $nr = 0;
206                                 $tot = $src->get_doccount;
207                                 $fmt = "$pfx % ".length($tot)."u/$tot\n";
208                                 warn "$pfx copying $tot documents\n";
209                         }
210                 };
211         } while (cpdb_retryable($src, $pfx));
212
213         do {
214                 eval {
215                         while ($it != $end) {
216                                 my $docid = $it->get_docid;
217                                 my $doc = $src->get_document($docid);
218                                 $dst->replace_document($docid, $doc);
219                                 $it->inc;
220                                 if ($fmt && !(++$nr & 1023)) {
221                                         warn(sprintf($fmt, $nr));
222                                 }
223                         }
224
225                         # unlike copydatabase(1), we don't copy spelling
226                         # and synonym data (or other user metadata) since
227                         # the Perl APIs don't expose iterators for them
228                         # (and public-inbox does not use those features)
229                 };
230         } while (cpdb_retryable($src, $pfx));
231
232         warn(sprintf($fmt, $nr)) if $fmt;
233         return unless $opt->{compact};
234
235         $src = $dst = undef; # flushes and closes
236         $pfx = undef unless $fmt;
237
238         warn "$pfx compacting...\n" if $pfx;
239         # this is probably the best place to do xapian-compact
240         # since $dst isn't readable by HTTP or NNTP clients, yet:
241         my $cmd = [ $XAPIAN_COMPACT, '--no-renumber', $tmp, $new ];
242         my $rdr = {};
243         foreach my $fd (0..2) {
244                 defined(my $dst = $opt->{$fd}) or next;
245                 $rdr->{$fd} = $dst;
246         }
247
248         my ($r, $w);
249         if ($pfx && pipe($r, $w)) {
250                 $rdr->{1} = fileno($w);
251         }
252         my $pid = spawn($cmd, $env, $rdr);
253         if ($pfx) {
254                 close $w or die "close: \$w: $!";
255                 foreach (<$r>) {
256                         s/\r/\r$pfx /g;
257                         warn "$pfx $_";
258                 }
259         }
260         my $rp = waitpid($pid, 0);
261         if ($? || $rp != $pid) {
262                 die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n";
263         }
264         remove_tree($tmp) or die "failed to remove $tmp: $!\n";
265 }
266
267 1;