]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Xapcmd.pm
xcpdb: use fine-grained locking
[public-inbox.git] / lib / PublicInbox / Xapcmd.pm
1 # Copyright (C) 2018-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 package PublicInbox::Xapcmd;
4 use strict;
5 use warnings;
6 use PublicInbox::Spawn qw(which spawn);
7 use PublicInbox::Over;
8 use PublicInbox::Search;
9 use File::Temp qw(tempdir);
10 use File::Path qw(remove_tree);
11
12 # support testing with dev versions of Xapian which installs
13 # commands with a version number suffix (e.g. "xapian-compact-1.5")
14 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
15
16 sub commit_changes ($$$$) {
17         my ($ibx, $old, $new, $opt) = @_;
18
19         my $reindex = $opt->{reindex};
20         my $im = $ibx->importer(0);
21         $im->lock_acquire if $reindex;
22
23         my @st = stat($old) or die "failed to stat($old): $!\n";
24
25         my $over = "$old/over.sqlite3";
26         if (-f $over) {
27                 $over = PublicInbox::Over->new($over);
28                 $over->connect->sqlite_backup_to_file("$new/over.sqlite3");
29                 $over = undef;
30         }
31         rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
32         chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
33         rename($new, $old) or die "rename $new => $old: $!\n";
34         remove_tree("$old/old") or die "failed to remove $old/old: $!\n";
35
36         if ($reindex) {
37                 $opt->{-skip_lock} = 1;
38                 PublicInbox::Admin::index_inbox($ibx, $opt);
39                 # implicit lock_release
40         } else {
41                 $im->lock_release;
42         }
43 }
44
45 sub xspawn {
46         my ($cmd, $env, $opt) = @_;
47         if (ref($cmd->[0]) eq 'CODE') {
48                 my $cb = shift(@$cmd); # $cb = cpdb()
49                 defined(my $pid = fork) or die "fork: $!";
50                 return $pid if $pid > 0;
51                 eval { $cb->($cmd, $env, $opt) };
52                 die $@ if $@;
53                 exit 0;
54         } else {
55                 spawn($cmd, $env, $opt);
56         }
57 }
58
59 sub runnable_or_die ($) {
60         my ($exe) = @_;
61         which($exe) or die "$exe not found in PATH\n";
62 }
63
64 sub prepare_reindex ($$) {
65         my ($ibx, $reindex) = @_;
66         if ($ibx->{version} == 1) {
67                 my $dir = $ibx->search->xdir(1);
68                 my $xdb = Search::Xapian::Database->new($dir);
69                 if (my $lc = $xdb->get_metadata('last_commit')) {
70                         $reindex->{from} = $lc;
71                 }
72         } else { # v2
73                 my $v2w = $ibx->importer(0);
74                 my $max;
75                 $v2w->git_dir_latest(\$max) or return;
76                 my $from = $reindex->{from};
77                 my $mm = $ibx->mm;
78                 my $v = PublicInbox::Search::SCHEMA_VERSION();
79                 foreach my $i (0..$max) {
80                         $from->[$i] = $mm->last_commit_xap($v, $i);
81                 }
82         }
83 }
84
85 sub run {
86         my ($ibx, $cmd, $env, $opt) = @_;
87         $opt ||= {};
88         my $dir = $ibx->{mainrepo} or die "no mainrepo in inbox\n";
89         my $exe = $cmd->[0];
90         my $pfx = $exe;
91         runnable_or_die($XAPIAN_COMPACT) if $opt->{compact};
92
93         my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } }
94         my $from; # per-epoch ranges
95
96         if (ref($exe) eq 'CODE') {
97                 $pfx = 'CODE';
98                 $reindex = $opt->{reindex} = {};
99                 $from = $reindex->{from} = [];
100                 require Search::Xapian::WritableDatabase;
101         } else {
102                 runnable_or_die($exe);
103         }
104         $ibx->umask_prepare;
105         my $old = $ibx->search->xdir(1);
106         -d $old or die "$old does not exist\n";
107         my $new = tempdir("$pfx-XXXXXXXX", DIR => $dir);
108         my $v = $ibx->{version} ||= 1;
109         my @cmds;
110         if ($v == 1) {
111                 push @cmds, [@$cmd, $old, $new];
112         } else {
113                 opendir my $dh, $old or die "Failed to opendir $old: $!\n";
114                 while (defined(my $dn = readdir($dh))) {
115                         if ($dn =~ /\A\d+\z/) {
116                                 push @cmds, [@$cmd, "$old/$dn", "$new/$dn"];
117                         } elsif ($dn eq '.' || $dn eq '..') {
118                         } elsif ($dn =~ /\Aover\.sqlite3/) {
119                         } else {
120                                 warn "W: skipping unknown dir: $old/$dn\n"
121                         }
122                 }
123                 die "No Xapian parts found in $old\n" unless @cmds;
124         }
125         my $im = $ibx->importer(0);
126         my $max = $opt->{jobs} || scalar(@cmds);
127         $ibx->with_umask(sub {
128                 $im->lock_acquire;
129
130                 # fine-grained locking if we prepare for reindex
131                 if ($reindex) {
132                         prepare_reindex($ibx, $reindex);
133                         $im->lock_release;
134                 }
135                 delete($ibx->{$_}) for (qw(mm over search)); # cleanup
136                 my %pids;
137                 while (@cmds) {
138                         while (scalar(keys(%pids)) < $max && scalar(@cmds)) {
139                                 my $x = shift @cmds;
140                                 $pids{xspawn($x, $env, $opt)} = $x;
141                         }
142
143                         while (scalar keys %pids) {
144                                 my $pid = waitpid(-1, 0);
145                                 my $x = delete $pids{$pid};
146                                 die join(' ', @$x)." failed: $?\n" if $?;
147                         }
148                 }
149                 commit_changes($ibx, $old, $new, $opt);
150         });
151 }
152
153 sub cpdb_retryable ($$) {
154         my ($src, $err) = @_;
155         if (ref($err) eq 'Search::Xapian::DatabaseModifiedError') {
156                 warn "$err, reopening and retrying\n";
157                 $src->reopen;
158                 return 1;
159         }
160         die $err if $err;
161         0;
162 }
163
164 sub cpdb {
165         my ($args, $env, $opt) = @_;
166         my ($old, $new) = @$args;
167         my $src = Search::Xapian::Database->new($old);
168         my $tmp = $opt->{compact} ? "$new.compact" : $new;
169
170         # like copydatabase(1), be sure we don't overwrite anything in case
171         # of other bugs:
172         my $creat = Search::Xapian::DB_CREATE();
173         my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat);
174         my ($it, $end);
175
176         do {
177                 eval {
178                         # update the only metadata key for v1:
179                         my $lc = $src->get_metadata('last_commit');
180                         $dst->set_metadata('last_commit', $lc) if $lc;
181
182                         $it = $src->postlist_begin('');
183                         $end = $src->postlist_end('');
184                 };
185         } while (cpdb_retryable($src, $@));
186
187         do {
188                 eval {
189                         while ($it != $end) {
190                                 my $docid = $it->get_docid;
191                                 my $doc = $src->get_document($docid);
192                                 $dst->replace_document($docid, $doc);
193                                 $it->inc;
194                         }
195
196                         # unlike copydatabase(1), we don't copy spelling
197                         # and synonym data (or other user metadata) since
198                         # the Perl APIs don't expose iterators for them
199                         # (and public-inbox does not use those features)
200                 };
201         } while (cpdb_retryable($src, $@));
202
203         return unless $opt->{compact};
204
205         $src = $dst = undef; # flushes and closes
206
207         # this is probably the best place to do xapian-compact
208         # since $dst isn't readable by HTTP or NNTP clients, yet:
209         my $cmd = [ $XAPIAN_COMPACT, '--no-renumber', $tmp, $new ];
210         my $rdr = {};
211         foreach my $fd (0..2) {
212                 defined(my $dst = $opt->{$fd}) or next;
213                 $rdr->{$fd} = $dst;
214         }
215         my $pid = spawn($cmd, $env, $rdr);
216         my $r = waitpid($pid, 0);
217         if ($? || $r != $pid) {
218                 die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$r)\n";
219         }
220         remove_tree($tmp) or die "failed to remove $tmp: $!\n";
221 }
222
223 1;