]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Xapcmd.pm
xapcmd: do not reset %SIG until last Xtmpdir is done
[public-inbox.git] / lib / PublicInbox / Xapcmd.pm
1 # Copyright (C) 2018-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 package PublicInbox::Xapcmd;
4 use strict;
5 use warnings;
6 use PublicInbox::Spawn qw(which spawn);
7 use PublicInbox::Over;
8 use PublicInbox::Search;
9 use File::Temp qw(tempdir);
10 use File::Path qw(remove_tree);
11 use File::Basename qw(dirname);
12
13 # support testing with dev versions of Xapian which installs
14 # commands with a version number suffix (e.g. "xapian-compact-1.5")
15 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
16 our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
17
18 sub commit_changes ($$$) {
19         my ($ibx, $tmp, $opt) = @_;
20
21         my $reindex = $opt->{reindex};
22         my $im = $ibx->importer(0);
23         $im->lock_acquire if !$opt->{-coarse_lock};
24
25         $SIG{INT} or die 'BUG: $SIG{INT} not handled';
26
27         while (my ($old, $new) = each %$tmp) {
28                 my @st = stat($old) or die "failed to stat($old): $!\n";
29
30                 my $over = "$old/over.sqlite3";
31                 if (-f $over) { # only for v1, v2 over is untouched
32                         $over = PublicInbox::Over->new($over);
33                         my $tmp_over = "$new/over.sqlite3";
34                         $over->connect->sqlite_backup_to_file($tmp_over);
35                         $over = undef;
36                 }
37                 chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
38
39                 # Xtmpdir->DESTROY won't remove $new after this:
40                 rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
41                 rename($new, $old) or die "rename $new => $old: $!\n";
42                 my $prev = "$old/old";
43                 remove_tree($prev) or die "failed to remove $prev: $!\n";
44         }
45         $tmp->done;
46         if (!$opt->{-coarse_lock}) {
47                 $opt->{-skip_lock} = 1;
48                 PublicInbox::Admin::index_inbox($ibx, $opt);
49                 # implicit lock_release
50         } else {
51                 $im->lock_release;
52         }
53 }
54
55 sub cb_spawn {
56         my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact()
57         defined(my $pid = fork) or die "fork: $!";
58         return $pid if $pid > 0;
59         $cb->($args, $opt);
60         exit 0;
61 }
62
63 sub runnable_or_die ($) {
64         my ($exe) = @_;
65         which($exe) or die "$exe not found in PATH\n";
66 }
67
68 sub prepare_reindex ($$) {
69         my ($ibx, $reindex) = @_;
70         if ($ibx->{version} == 1) {
71                 my $dir = $ibx->search->xdir(1);
72                 my $xdb = Search::Xapian::Database->new($dir);
73                 if (my $lc = $xdb->get_metadata('last_commit')) {
74                         $reindex->{from} = $lc;
75                 }
76         } else { # v2
77                 my $v2w = $ibx->importer(0);
78                 my $max;
79                 $v2w->git_dir_latest(\$max) or return;
80                 my $from = $reindex->{from};
81                 my $mm = $ibx->mm;
82                 my $v = PublicInbox::Search::SCHEMA_VERSION();
83                 foreach my $i (0..$max) {
84                         $from->[$i] = $mm->last_commit_xap($v, $i);
85                 }
86         }
87 }
88
89 sub progress_prepare ($) {
90         my ($opt) = @_;
91         if ($opt->{quiet}) {
92                 open my $null, '>', '/dev/null' or
93                         die "failed to open /dev/null: $!\n";
94                 $opt->{1} = fileno($null);
95                 $opt->{-dev_null} = $null;
96         } else {
97                 $opt->{-progress} = sub { print STDERR @_ };
98         }
99 }
100
101 sub same_fs_or_die ($$) {
102         my ($x, $y) = @_;
103         return if ((stat($x))[0] == (stat($y))[0]); # 0 - st_dev
104         die "$x and $y reside on different filesystems\n";
105 }
106
107 sub process_queue {
108         my ($queue, $cb, $max, $opt) = @_;
109         if ($max <= 1) {
110                 while (defined(my $args = shift @$queue)) {
111                         $cb->($args, $opt);
112                 }
113                 return;
114         }
115
116         # run in parallel:
117         my %pids;
118         while (@$queue) {
119                 while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
120                         my $args = shift @$queue;
121                         $pids{cb_spawn($cb, $args, $opt)} = $args;
122                 }
123
124                 while (scalar keys %pids) {
125                         my $pid = waitpid(-1, 0);
126                         my $args = delete $pids{$pid};
127                         die join(' ', @$args)." failed: $?\n" if $?;
128                 }
129         }
130 }
131
132 sub run {
133         my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact'
134         my $cb = \&${\"PublicInbox::Xapcmd::$task"};
135         progress_prepare($opt ||= {});
136         my $dir = $ibx->{mainrepo} or die "no mainrepo in inbox\n";
137         runnable_or_die($XAPIAN_COMPACT) if $opt->{compact};
138         my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } }
139         my $from; # per-epoch ranges
140
141         if (!$opt->{-coarse_lock}) {
142                 $reindex = $opt->{reindex} = {};
143                 $from = $reindex->{from} = [];
144                 require Search::Xapian::WritableDatabase;
145         }
146
147         $ibx->umask_prepare;
148         my $old = $ibx->search->xdir(1);
149         -d $old or die "$old does not exist\n";
150
151         my $tmp = PublicInbox::Xtmpdirs->new;
152         my $v = $ibx->{version} ||= 1;
153         my @q;
154
155         # we want temporary directories to be as deep as possible,
156         # so v2 partitions can keep "xap$SCHEMA_VERSION" on a separate FS.
157         if ($v == 1) {
158                 my $old_parent = dirname($old);
159                 same_fs_or_die($old_parent, $old);
160                 $tmp->{$old} = tempdir('xapcmd-XXXXXXXX', DIR => $old_parent);
161                 push @q, [ $old, $tmp->{$old} ];
162         } else {
163                 opendir my $dh, $old or die "Failed to opendir $old: $!\n";
164                 while (defined(my $dn = readdir($dh))) {
165                         if ($dn =~ /\A\d+\z/) {
166                                 my $tmpl = "$dn-XXXXXXXX";
167                                 my $dst = tempdir($tmpl, DIR => $old);
168                                 same_fs_or_die($old, $dst);
169                                 my $cur = "$old/$dn";
170                                 push @q, [ $cur, $dst ];
171                                 $tmp->{$cur} = $dst;
172                         } elsif ($dn eq '.' || $dn eq '..') {
173                         } elsif ($dn =~ /\Aover\.sqlite3/) {
174                         } else {
175                                 warn "W: skipping unknown dir: $old/$dn\n"
176                         }
177                 }
178                 die "No Xapian parts found in $old\n" unless @q;
179         }
180         my $im = $ibx->importer(0);
181         my $max = $opt->{jobs} || scalar(@q);
182         $ibx->with_umask(sub {
183                 $im->lock_acquire;
184
185                 # fine-grained locking if we prepare for reindex
186                 if (!$opt->{-coarse_lock}) {
187                         prepare_reindex($ibx, $reindex);
188                         $im->lock_release;
189                 }
190
191                 delete($ibx->{$_}) for (qw(mm over search)); # cleanup
192                 process_queue(\@q, $cb, $max, $opt);
193                 commit_changes($ibx, $tmp, $opt);
194         });
195 }
196
197 sub cpdb_retryable ($$) {
198         my ($src, $pfx) = @_;
199         if (ref($@) eq 'Search::Xapian::DatabaseModifiedError') {
200                 warn "$pfx Xapian DB modified, reopening and retrying\n";
201                 $src->reopen;
202                 return 1;
203         }
204         if ($@) {
205                 warn "$pfx E: ", ref($@), "\n";
206                 die;
207         }
208         0;
209 }
210
211 sub progress_pfx ($) {
212         my @p = split('/', $_[0]);
213
214         # return "xap15/0" for v2, or "xapian15" for v1:
215         ($p[-1] =~ /\A\d+\z/) ? "$p[-2]/$p[-1]" : $p[-1];
216 }
217
218 # xapian-compact wrapper
219 sub compact ($$) {
220         my ($args, $opt) = @_;
221         my ($src, $dst) = @$args;
222         my ($r, $w);
223         my $pfx = $opt->{-progress_pfx} ||= progress_pfx($src);
224         my $pr = $opt->{-progress};
225         my $rdr = {};
226
227         foreach my $fd (0..2) {
228                 defined(my $dfd = $opt->{$fd}) or next;
229                 $rdr->{$fd} = $dfd;
230         }
231         $rdr->{1} = fileno($w) if $pr && pipe($r, $w);
232
233         # we rely on --no-renumber to keep docids synched to NNTP
234         my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ];
235         for my $sw (qw(no-full fuller)) {
236                 push @$cmd, "--$sw" if $opt->{$sw};
237         }
238         for my $sw (qw(blocksize)) {
239                 defined(my $v = $opt->{$sw}) or next;
240                 push @$cmd, "--$sw", $v;
241         }
242         $pr->("$pfx `".join(' ', @$cmd)."'\n") if $pr;
243         push @$cmd, $src, $dst;
244         my $pid = spawn($cmd, undef, $rdr);
245         if ($pr) {
246                 close $w or die "close: \$w: $!";
247                 foreach (<$r>) {
248                         s/\r/\r$pfx /g;
249                         $pr->("$pfx $_");
250                 }
251         }
252         my $rp = waitpid($pid, 0);
253         if ($? || $rp != $pid) {
254                 die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n";
255         }
256 }
257
258 # Like copydatabase(1), this is horribly slow; and it doesn't seem due
259 # to the overhead of Perl.
260 sub cpdb ($$) {
261         my ($args, $opt) = @_;
262         my ($old, $new) = @$args;
263         my $src = Search::Xapian::Database->new($old);
264         my ($xtmp, $tmp);
265         if ($opt->{compact}) {
266                 my $newdir = dirname($new);
267                 same_fs_or_die($newdir, $new);
268                 $tmp = tempdir("$new.compact-XXXXXX", DIR => $newdir);
269                 $xtmp = PublicInbox::Xtmpdirs->new;
270                 $xtmp->{$new} = $tmp;
271         } else {
272                 $tmp = $new;
273         }
274
275         # like copydatabase(1), be sure we don't overwrite anything in case
276         # of other bugs:
277         my $creat = Search::Xapian::DB_CREATE();
278         my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat);
279         my ($it, $end);
280         my ($nr, $tot, $fmt); # progress output
281         my $pr = $opt->{-progress};
282         my $pfx = $opt->{-progress_pfx} = progress_pfx($old);
283
284         do {
285                 eval {
286                         # update the only metadata key for v1:
287                         my $lc = $src->get_metadata('last_commit');
288                         $dst->set_metadata('last_commit', $lc) if $lc;
289
290                         $it = $src->postlist_begin('');
291                         $end = $src->postlist_end('');
292                         if ($pr) {
293                                 $nr = 0;
294                                 $tot = $src->get_doccount;
295                                 $fmt = "$pfx % ".length($tot)."u/$tot\n";
296                                 $pr->("$pfx copying $tot documents\n");
297                         }
298                 };
299         } while (cpdb_retryable($src, $pfx));
300
301         do {
302                 eval {
303                         while ($it != $end) {
304                                 my $docid = $it->get_docid;
305                                 my $doc = $src->get_document($docid);
306                                 $dst->replace_document($docid, $doc);
307                                 $it->inc;
308                                 if ($pr && !(++$nr & 1023)) {
309                                         $pr->(sprintf($fmt, $nr));
310                                 }
311                         }
312
313                         # unlike copydatabase(1), we don't copy spelling
314                         # and synonym data (or other user metadata) since
315                         # the Perl APIs don't expose iterators for them
316                         # (and public-inbox does not use those features)
317                 };
318         } while (cpdb_retryable($src, $pfx));
319
320         $pr->(sprintf($fmt, $nr)) if $pr;
321         return unless $xtmp;
322
323         $src = $dst = undef; # flushes and closes
324
325         # this is probably the best place to do xapian-compact
326         # since $dst isn't readable by HTTP or NNTP clients, yet:
327         compact([ $tmp, $new ], $opt);
328         remove_tree($tmp) or die "failed to remove $tmp: $!\n";
329         $xtmp->done;
330 }
331
332 # slightly easier-to-manage manage than END{} blocks
333 package PublicInbox::Xtmpdirs;
334 use strict;
335 use warnings;
336 use File::Path qw(remove_tree);
337 my %owner;
338
339 sub new {
340         # http://www.tldp.org/LDP/abs/html/exitcodes.html
341         $SIG{INT} = sub { exit(130) };
342         $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) };
343         my $self = bless {}, $_[0]; # old partition => new (tmp) partition
344         $owner{"$self"} = $$;
345         $self;
346 }
347
348 sub done {
349         my ($self) = @_;
350         delete $owner{"$self"};
351
352         my %known_pids;
353         $known_pids{$_}++ foreach values %owner;
354         if (!$known_pids{$$}) {
355                 $SIG{INT} = $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = 'DEFAULT';
356         }
357         %$self = ();
358 }
359
360 sub DESTROY {
361         my ($self) = @_;
362         my $owner_pid = delete $owner{"$self"} or return;
363         return if $owner_pid != $$;
364         foreach my $new (values %$self) {
365                 remove_tree($new) unless -d "$new/old";
366         }
367         done($self);
368 }
369
370 1;