]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/SearchIdxShard.pm
searchidxshard: use PublicInbox::IPC to kill lots of code
[public-inbox.git] / lib / PublicInbox / SearchIdxShard.pm
1 # Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # Internal interface for a single Xapian shard in V2 inboxes.
5 # See L<public-inbox-v2-format(5)> for more info on how we shard Xapian
6 package PublicInbox::SearchIdxShard;
7 use strict;
8 use v5.10.1;
9 use parent qw(PublicInbox::SearchIdx PublicInbox::IPC);
10 use PublicInbox::OnDestroy;
11
12 sub new {
13         my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx
14         my $ibx = $v2w->{ibx};
15         my $self = $ibx ? $class->SUPER::new($ibx, 1, $shard)
16                         : $class->eidx_shard_new($v2w, $shard);
17         # create the DB before forking:
18         $self->idx_acquire;
19         $self->set_metadata_once;
20         $self->idx_release;
21         if ($v2w->{parallel}) {
22                 local $self->{-v2w_afc} = $v2w;
23                 $self->ipc_worker_spawn("shard[$shard]");
24         }
25         $self;
26 }
27
28 sub _worker_done {
29         my ($self) = @_;
30         if ($self->need_xapian) {
31                 die "$$ $0 xdb not released\n" if $self->{xdb};
32         }
33         die "$$ $0 still in transaction\n" if $self->{txn};
34 }
35
36 sub ipc_atfork_child { # called automatically before ipc_worker_loop
37         my ($self) = @_;
38         my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing';
39         $v2w->atfork_child; # calls shard_atfork_child on our siblings
40         $v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__}
41         $self->begin_txn_lazy;
42         # caller must capture this:
43         PublicInbox::OnDestroy->new($$, \&_worker_done, $self);
44 }
45
46 sub index_raw {
47         my ($self, $msgref, $eml, $smsg, $eidx_key) = @_;
48         if ($eml) {
49                 undef($$msgref) if $msgref;
50         } else { # --xapian-only + --sequential-shard:
51                 $eml = PublicInbox::Eml->new($msgref);
52         }
53         $smsg->{eidx_key} = $eidx_key if defined $eidx_key;
54         $self->ipc_do('add_message', $eml, $smsg);
55 }
56
57 sub shard_add_eidx_info {
58         my ($self, $docid, $eidx_key, $eml) = @_;
59         $self->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
60 }
61
62 sub shard_remove_eidx_info {
63         my ($self, $docid, $eidx_key, $eml) = @_;
64         $self->ipc_do('remove_eidx_info', $docid, $eidx_key, $eml);
65 }
66
67 # needed when there's multiple IPC workers and the parent forking
68 # causes newer siblings to inherit older siblings sockets
69 sub shard_atfork_child {
70         my ($self) = @_;
71         my $pid = delete($self->{-ipc_worker_pid}) or
72                         die "BUG: $$ no -ipc_worker_pid";
73         my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock";
74         $pid == $$ and die "BUG: $$ shard_atfork_child called on itself";
75         close($s1) or die "close -ipc_sock: $!";
76 }
77
78 # wait for return to determine when ipc_do('commit_txn_lazy') is done
79 sub echo {
80         shift;
81         "@_";
82 }
83
84 sub idx_close {
85         my ($self) = @_;
86         die "transaction in progress $self\n" if $self->{txn};
87         $self->idx_release if $self->{xdb};
88 }
89
90 sub shard_close {
91         my ($self) = @_;
92         $self->ipc_do('idx_close');
93         $self->ipc_worker_stop;
94 }
95
96 sub shard_remove {
97         my ($self, $num) = @_;
98         $self->ipc_do('remove_by_docid', $num);
99 }
100
101 sub shard_set_keywords {
102         my ($self, $docid, @kw) = @_;
103         $self->ipc_do('set_keywords', $docid, @kw);
104 }
105
106 sub shard_remove_keywords {
107         my ($self, $docid, @kw) = @_;
108         $self->ipc_do('remove_keywords', $docid, @kw);
109 }
110
111 sub shard_add_keywords {
112         my ($self, $docid, @kw) = @_;
113         $self->ipc_do('add_keywords', $docid, @kw);
114 }
115
116 sub shard_over_check {
117         my ($self, $over) = @_;
118         if ($self->{-ipc_sock} && $over->{dbh}) {
119                 # can't send DB handles over IPC
120                 $over = ref($over)->new($over->{dbh}->sqlite_db_filename);
121         }
122         $self->ipc_do('over_check', $over);
123 }
124
125 1;