]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Xapcmd.pm
xapcmd: xcpdb supports compaction
[public-inbox.git] / lib / PublicInbox / Xapcmd.pm
1 # Copyright (C) 2018-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 package PublicInbox::Xapcmd;
4 use strict;
5 use warnings;
6 use PublicInbox::Spawn qw(which spawn);
7 use PublicInbox::Over;
8 use File::Temp qw(tempdir);
9 use File::Path qw(remove_tree);
10
11 # support testing with dev versions of Xapian which installs
12 # commands with a version number suffix (e.g. "xapian-compact-1.5")
13 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
14
15 sub commit_changes ($$$) {
16         my ($im, $old, $new) = @_;
17         my @st = stat($old) or die "failed to stat($old): $!\n";
18
19         my $over = "$old/over.sqlite3";
20         if (-f $over) {
21                 $over = PublicInbox::Over->new($over);
22                 $over->connect->sqlite_backup_to_file("$new/over.sqlite3");
23         }
24         rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
25         chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
26         rename($new, $old) or die "rename $new => $old: $!\n";
27         $im->lock_release;
28         remove_tree("$old/old") or die "failed to remove $old/old: $!\n";
29 }
30
31 sub xspawn {
32         my ($cmd, $env, $opt) = @_;
33         if (ref($cmd->[0]) eq 'CODE') {
34                 my $cb = shift(@$cmd); # $cb = cpdb()
35                 defined(my $pid = fork) or die "fork: $!";
36                 return $pid if $pid > 0;
37                 eval { $cb->($cmd, $env, $opt) };
38                 die $@ if $@;
39                 exit 0;
40         } else {
41                 spawn($cmd, $env, $opt);
42         }
43 }
44
45 sub runnable_or_die ($) {
46         my ($exe) = @_;
47         which($exe) or die "$exe not found in PATH\n";
48 }
49
50 sub run {
51         my ($ibx, $cmd, $env, $opt) = @_;
52         $opt ||= {};
53         my $dir = $ibx->{mainrepo} or die "no mainrepo in inbox\n";
54         my $exe = $cmd->[0];
55         my $pfx = $exe;
56         runnable_or_die($XAPIAN_COMPACT) if $opt->{compact};
57         if (ref($exe) eq 'CODE') {
58                 $pfx = 'CODE';
59                 require Search::Xapian::WritableDatabase;
60         } else {
61                 runnable_or_die($exe);
62         }
63         $ibx->umask_prepare;
64         my $old = $ibx->search->xdir(1);
65         -d $old or die "$old does not exist\n";
66         my $new = tempdir("$pfx-XXXXXXXX", DIR => $dir);
67         my $v = $ibx->{version} || 1;
68         my @cmds;
69         if ($v == 1) {
70                 push @cmds, [@$cmd, $old, $new];
71         } else {
72                 opendir my $dh, $old or die "Failed to opendir $old: $!\n";
73                 while (defined(my $dn = readdir($dh))) {
74                         if ($dn =~ /\A\d+\z/) {
75                                 push @cmds, [@$cmd, "$old/$dn", "$new/$dn"];
76                         } elsif ($dn eq '.' || $dn eq '..') {
77                         } elsif ($dn =~ /\Aover\.sqlite3/) {
78                         } else {
79                                 warn "W: skipping unknown dir: $old/$dn\n"
80                         }
81                 }
82                 die "No Xapian parts found in $old\n" unless @cmds;
83         }
84         my $im = $ibx->importer(0);
85         my $max = $opt->{jobs} || scalar(@cmds);
86         $ibx->with_umask(sub {
87                 $im->lock_acquire;
88                 my %pids;
89                 while (@cmds) {
90                         while (scalar(keys(%pids)) < $max && scalar(@cmds)) {
91                                 my $x = shift @cmds;
92                                 $pids{xspawn($x, $env, $opt)} = $x;
93                         }
94
95                         while (scalar keys %pids) {
96                                 my $pid = waitpid(-1, 0);
97                                 my $x = delete $pids{$pid};
98                                 die join(' ', @$x)." failed: $?\n" if $?;
99                         }
100                 }
101                 commit_changes($im, $old, $new);
102         });
103 }
104
105 sub cpdb_retryable ($$) {
106         my ($src, $err) = @_;
107         if (ref($err) eq 'Search::Xapian::DatabaseModifiedError') {
108                 warn "$err, reopening and retrying\n";
109                 $src->reopen;
110                 return 1;
111         }
112         die $err if $err;
113         0;
114 }
115
116 sub cpdb {
117         my ($args, $env, $opt) = @_;
118         my ($old, $new) = @$args;
119         my $src = Search::Xapian::Database->new($old);
120         my $tmp = $opt->{compact} ? "$new.compact" : $new;
121
122         # like copydatabase(1), be sure we don't overwrite anything in case
123         # of other bugs:
124         my $creat = Search::Xapian::DB_CREATE();
125         my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat);
126         my ($it, $end);
127
128         do {
129                 eval {
130                         # update the only metadata key for v1:
131                         my $lc = $src->get_metadata('last_commit');
132                         $dst->set_metadata('last_commit', $lc) if $lc;
133
134                         $it = $src->postlist_begin('');
135                         $end = $src->postlist_end('');
136                 };
137         } while (cpdb_retryable($src, $@));
138
139         do {
140                 eval {
141                         while ($it != $end) {
142                                 my $docid = $it->get_docid;
143                                 my $doc = $src->get_document($docid);
144                                 $dst->replace_document($docid, $doc);
145                                 $it->inc;
146                         }
147
148                         # unlike copydatabase(1), we don't copy spelling
149                         # and synonym data (or other user metadata) since
150                         # the Perl APIs don't expose iterators for them
151                         # (and public-inbox does not use those features)
152                 };
153         } while (cpdb_retryable($src, $@));
154
155         return unless $opt->{compact};
156
157         $src = $dst = undef; # flushes and closes
158
159         # this is probably the best place to do xapian-compact
160         # since $dst isn't readable by HTTP or NNTP clients, yet:
161         my $cmd = [ $XAPIAN_COMPACT, '--no-renumber', $tmp, $new ];
162         my $rdr = {};
163         foreach my $fd (0..2) {
164                 defined(my $dst = $opt->{$fd}) or next;
165                 $rdr->{$fd} = $dst;
166         }
167         my $pid = spawn($cmd, $env, $rdr);
168         my $r = waitpid($pid, 0);
169         if ($? || $r != $pid) {
170                 die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$r)\n";
171         }
172         remove_tree($tmp) or die "failed to remove $tmp: $!\n";
173 }
174
175 1;