]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Xapcmd.pm
xcpdb: remove temporary directories on aborts
[public-inbox.git] / lib / PublicInbox / Xapcmd.pm
1 # Copyright (C) 2018-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 package PublicInbox::Xapcmd;
4 use strict;
5 use warnings;
6 use PublicInbox::Spawn qw(which spawn);
7 use PublicInbox::Over;
8 use PublicInbox::Search;
9 use File::Temp qw(tempdir);
10 use File::Path qw(remove_tree);
11 use File::Basename qw(dirname);
12
13 # support testing with dev versions of Xapian which installs
14 # commands with a version number suffix (e.g. "xapian-compact-1.5")
15 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
16
17 sub commit_changes ($$$) {
18         my ($ibx, $tmp, $opt) = @_;
19
20         my $reindex = $opt->{reindex};
21         my $im = $ibx->importer(0);
22         $im->lock_acquire if $reindex;
23
24         while (my ($old, $new) = each %$tmp) {
25                 my @st = stat($old) or die "failed to stat($old): $!\n";
26
27                 my $over = "$old/over.sqlite3";
28                 if (-f $over) { # only for v1, v2 over is untouched
29                         $over = PublicInbox::Over->new($over);
30                         my $tmp_over = "$new/over.sqlite3";
31                         $over->connect->sqlite_backup_to_file($tmp_over);
32                         $over = undef;
33                 }
34                 chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
35
36                 # Xtmpdir->DESTROY won't remove $new after this:
37                 rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
38                 rename($new, $old) or die "rename $new => $old: $!\n";
39                 my $prev = "$old/old";
40                 remove_tree($prev) or die "failed to remove $prev: $!\n";
41         }
42         $tmp->done;
43         if ($reindex) {
44                 $opt->{-skip_lock} = 1;
45                 PublicInbox::Admin::index_inbox($ibx, $opt);
46                 # implicit lock_release
47         } else {
48                 $im->lock_release;
49         }
50 }
51
52 sub xspawn {
53         my ($cmd, $env, $opt) = @_;
54         if (ref($cmd->[0]) eq 'CODE') {
55                 my $cb = shift(@$cmd); # $cb = cpdb()
56                 defined(my $pid = fork) or die "fork: $!";
57                 return $pid if $pid > 0;
58                 eval { $cb->($cmd, $env, $opt) };
59                 die $@ if $@;
60                 exit 0;
61         } else {
62                 spawn($cmd, $env, $opt);
63         }
64 }
65
66 sub runnable_or_die ($) {
67         my ($exe) = @_;
68         which($exe) or die "$exe not found in PATH\n";
69 }
70
71 sub prepare_reindex ($$) {
72         my ($ibx, $reindex) = @_;
73         if ($ibx->{version} == 1) {
74                 my $dir = $ibx->search->xdir(1);
75                 my $xdb = Search::Xapian::Database->new($dir);
76                 if (my $lc = $xdb->get_metadata('last_commit')) {
77                         $reindex->{from} = $lc;
78                 }
79         } else { # v2
80                 my $v2w = $ibx->importer(0);
81                 my $max;
82                 $v2w->git_dir_latest(\$max) or return;
83                 my $from = $reindex->{from};
84                 my $mm = $ibx->mm;
85                 my $v = PublicInbox::Search::SCHEMA_VERSION();
86                 foreach my $i (0..$max) {
87                         $from->[$i] = $mm->last_commit_xap($v, $i);
88                 }
89         }
90 }
91
92 sub progress_prepare ($) {
93         my ($opt) = @_;
94         if ($opt->{quiet}) {
95                 open my $null, '>', '/dev/null' or
96                         die "failed to open /dev/null: $!\n";
97                 $opt->{1} = fileno($null);
98                 $opt->{-dev_null} = $null;
99         } else {
100                 $opt->{-progress} = sub { print STDERR @_ };
101         }
102 }
103
104 sub same_fs_or_die ($$) {
105         my ($x, $y) = @_;
106         return if ((stat($x))[0] == (stat($y))[0]); # 0 - st_dev
107         die "$x and $y reside on different filesystems\n";
108 }
109
110 sub run {
111         my ($ibx, $cmd, $env, $opt) = @_;
112         progress_prepare($opt ||= {});
113         my $dir = $ibx->{mainrepo} or die "no mainrepo in inbox\n";
114         my $exe = $cmd->[0];
115         runnable_or_die($XAPIAN_COMPACT) if $opt->{compact};
116
117         my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } }
118         my $from; # per-epoch ranges
119
120         if (ref($exe) eq 'CODE') {
121                 $reindex = $opt->{reindex} = {};
122                 $from = $reindex->{from} = [];
123                 require Search::Xapian::WritableDatabase;
124         } else {
125                 runnable_or_die($exe);
126         }
127         $ibx->umask_prepare;
128         my $old = $ibx->search->xdir(1);
129         -d $old or die "$old does not exist\n";
130
131         my $tmp = PublicInbox::Xtmpdirs->new;
132         my $v = $ibx->{version} ||= 1;
133         my @cmds;
134
135         # we want temporary directories to be as deep as possible,
136         # so v2 partitions can keep "xap$SCHEMA_VERSION" on a separate FS.
137         if ($v == 1) {
138                 my $old_parent = dirname($old);
139                 same_fs_or_die($old_parent, $old);
140                 $tmp->{$old} = tempdir('xapcmd-XXXXXXXX', DIR => $old_parent);
141                 push @cmds, [ @$cmd, $old, $tmp->{$old} ];
142         } else {
143                 opendir my $dh, $old or die "Failed to opendir $old: $!\n";
144                 while (defined(my $dn = readdir($dh))) {
145                         if ($dn =~ /\A\d+\z/) {
146                                 my $tmpl = "$dn-XXXXXXXX";
147                                 my $dst = tempdir($tmpl, DIR => $old);
148                                 same_fs_or_die($old, $dst);
149                                 my $cur = "$old/$dn";
150                                 push @cmds, [@$cmd, $cur, $dst ];
151                                 $tmp->{$cur} = $dst;
152                         } elsif ($dn eq '.' || $dn eq '..') {
153                         } elsif ($dn =~ /\Aover\.sqlite3/) {
154                         } else {
155                                 warn "W: skipping unknown dir: $old/$dn\n"
156                         }
157                 }
158                 die "No Xapian parts found in $old\n" unless @cmds;
159         }
160         my $im = $ibx->importer(0);
161         my $max = $opt->{jobs} || scalar(@cmds);
162         $ibx->with_umask(sub {
163                 $im->lock_acquire;
164
165                 # fine-grained locking if we prepare for reindex
166                 if ($reindex) {
167                         prepare_reindex($ibx, $reindex);
168                         $im->lock_release;
169                 }
170                 delete($ibx->{$_}) for (qw(mm over search)); # cleanup
171                 my %pids;
172                 while (@cmds) {
173                         while (scalar(keys(%pids)) < $max && scalar(@cmds)) {
174                                 my $x = shift @cmds;
175                                 $pids{xspawn($x, $env, $opt)} = $x;
176                         }
177
178                         while (scalar keys %pids) {
179                                 my $pid = waitpid(-1, 0);
180                                 my $x = delete $pids{$pid};
181                                 die join(' ', @$x)." failed: $?\n" if $?;
182                         }
183                 }
184                 commit_changes($ibx, $tmp, $opt);
185         });
186 }
187
188 sub cpdb_retryable ($$) {
189         my ($src, $pfx) = @_;
190         if (ref($@) eq 'Search::Xapian::DatabaseModifiedError') {
191                 warn "$pfx Xapian DB modified, reopening and retrying\n";
192                 $src->reopen;
193                 return 1;
194         }
195         if ($@) {
196                 warn "$pfx E: ", ref($@), "\n";
197                 die;
198         }
199         0;
200 }
201
202 # Like copydatabase(1), this is horribly slow; and it doesn't seem due
203 # to the overhead of Perl.
204 sub cpdb {
205         my ($args, $env, $opt) = @_;
206         my ($old, $new) = @$args;
207         my $src = Search::Xapian::Database->new($old);
208         my $tmp = $opt->{compact} ? "$new.compact" : $new;
209
210         # like copydatabase(1), be sure we don't overwrite anything in case
211         # of other bugs:
212         my $creat = Search::Xapian::DB_CREATE();
213         my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat);
214         my ($it, $end);
215         my $pfx = '';
216         my ($nr, $tot, $fmt); # progress output
217         my $pr = $opt->{-progress};
218
219         do {
220                 eval {
221                         # update the only metadata key for v1:
222                         my $lc = $src->get_metadata('last_commit');
223                         $dst->set_metadata('last_commit', $lc) if $lc;
224
225                         $it = $src->postlist_begin('');
226                         $end = $src->postlist_end('');
227                         if ($pr) {
228                                 $nr = 0;
229                                 $tot = $src->get_doccount;
230                                 my @p = split('/', $old);
231                                 $pfx = "$p[-2]/$p[-1]:";
232                                 $fmt = "$pfx % ".length($tot)."u/$tot\n";
233                                 $pr->("$pfx copying $tot documents\n");
234                         }
235                 };
236         } while (cpdb_retryable($src, $pfx));
237
238         do {
239                 eval {
240                         while ($it != $end) {
241                                 my $docid = $it->get_docid;
242                                 my $doc = $src->get_document($docid);
243                                 $dst->replace_document($docid, $doc);
244                                 $it->inc;
245                                 if ($pr && !(++$nr & 1023)) {
246                                         $pr->(sprintf($fmt, $nr));
247                                 }
248                         }
249
250                         # unlike copydatabase(1), we don't copy spelling
251                         # and synonym data (or other user metadata) since
252                         # the Perl APIs don't expose iterators for them
253                         # (and public-inbox does not use those features)
254                 };
255         } while (cpdb_retryable($src, $pfx));
256
257         $pr->(sprintf($fmt, $nr)) if $pr;
258         return unless $opt->{compact};
259
260         $src = $dst = undef; # flushes and closes
261
262         $pr->("$pfx compacting...\n") if $pr;
263         # this is probably the best place to do xapian-compact
264         # since $dst isn't readable by HTTP or NNTP clients, yet:
265         my $cmd = [ $XAPIAN_COMPACT, '--no-renumber', $tmp, $new ];
266         my $rdr = {};
267         foreach my $fd (0..2) {
268                 defined(my $dst = $opt->{$fd}) or next;
269                 $rdr->{$fd} = $dst;
270         }
271
272         my ($r, $w);
273         if ($pr && pipe($r, $w)) {
274                 $rdr->{1} = fileno($w);
275         }
276         my $pid = spawn($cmd, $env, $rdr);
277         if ($pr) {
278                 close $w or die "close: \$w: $!";
279                 foreach (<$r>) {
280                         s/\r/\r$pfx /g;
281                         $pr->("$pfx $_");
282                 }
283         }
284         my $rp = waitpid($pid, 0);
285         if ($? || $rp != $pid) {
286                 die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n";
287         }
288         remove_tree($tmp) or die "failed to remove $tmp: $!\n";
289 }
290
291 # slightly easier-to-manage manage than END{} blocks
292 package PublicInbox::Xtmpdirs;
293 use strict;
294 use warnings;
295 use File::Path qw(remove_tree);
296 my %owner;
297
298 sub new {
299         # http://www.tldp.org/LDP/abs/html/exitcodes.html
300         $SIG{INT} = sub { exit(130) };
301         $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) };
302         my $self = bless {}, $_[0]; # old partition => new (tmp) partition
303         $owner{"$self"} = $$;
304         $self;
305 }
306
307 sub done {
308         my ($self) = @_;
309         delete $owner{"$self"};
310         $SIG{INT} = $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = 'DEFAULT';
311         %$self = ();
312 }
313
314 sub DESTROY {
315         my ($self) = @_;
316         my $owner_pid = delete $owner{"$self"} or return;
317         return if $owner_pid != $$;
318         foreach my $new (values %$self) {
319                 remove_tree($new) unless -d "$new/old";
320         }
321         $SIG{INT} = $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = 'DEFAULT';
322 }
323
324 1;