We will have a ->wq_do that doesn't pass FDs for I/O.
# base class for remote IPC calls and workqueues, requires Storable or Sereal
# - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes
-# - wq_do and wq_worker* is for a single producer and multiple workers,
+# - wq_io_do and wq_worker* is for a single producer and multiple workers,
# using SOCK_SEQPACKET for work distribution
# use ipc_do when you need work done on a certain process
-# use wq_do when your work can be done on any idle worker
+# use wq_io_do when your work can be done on any idle worker
package PublicInbox::IPC;
use strict;
use v5.10.1;
PublicInbox::DS->Reset;
}
-sub do_sock_stream { # via wq_do, for big requests
+sub do_sock_stream { # via wq_io_do, for big requests
my ($self, $len) = @_;
recv_and_run($self, delete $self->{0}, $len, 1);
}
-sub wq_do { # always async
+sub wq_io_do { # always async
my ($self, $sub, $ios, @args) = @_;
if (my $s1 = $self->{-wq_s1}) { # run in worker
my $fds = [ map { fileno($_) } @$ios ];
} else {
@$self{0..$#$ios} = @$ios;
eval { $self->$sub(@args) };
- warn "wq_do: $@" if $@;
+ warn "wq_io_do: $@" if $@;
delete @$self{0..$#$ios}; # don't close
}
}
my ($self) = @_;
return unless wq_workers($self);
my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
- $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
+ $self->wq_io_do('wq_exit', [ $s2, $s2, $s2 ]);
# caller must call wq_worker_decr_wait in main loop
}
$self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
my $op = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
- $self->wq_do('import_stdin', []) if $self->{0};
+ $self->wq_io_do('import_stdin', []) if $self->{0};
for my $x (@argv) {
- $self->wq_do('import_path_url', [], $x);
+ $self->wq_io_do('import_path_url', [], $x);
}
$self->wq_close(1);
$lei->event_step_init; # wait for shutdowns
die "TODO: non-HTTP/HTTPS clone of $self->{src} not supported, yet";
}
-sub do_mirror { # via wq_do
+sub do_mirror { # via wq_io_do
my ($self) = @_;
my $lei = $self->{lei};
eval {
$self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei});
my $op = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
- $self->wq_do('do_mirror', []);
+ $self->wq_io_do('do_mirror', []);
$self->wq_close(1);
$lei->event_step_init; # wait for shutdowns
if ($lei->{oneshot}) {
sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
-# we open this in the parent process before ->wq_do handoff
+# we open this in the parent process before ->wq_io_do handoff
sub ovv_out_lk_init ($) {
my ($self) = @_;
my $tmp = File::Temp->new("lei-ovv.dst.$$.lock-XXXXXX",
sub {
my ($smsg, $mitem) = @_;
$smsg->{pct} = get_pct($mitem) if $mitem;
- $l2m->wq_do('write_mail', [], $git_dir, $smsg);
+ $l2m->wq_io_do('write_mail', [], $git_dir, $smsg);
}
} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
}
}
-sub write_mail { # via ->wq_do
+sub write_mail { # via ->wq_io_do
my ($self, $git_dir, $smsg) = @_;
my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
git_async_cat($git, $smsg->{blob}, \&git_to_mail,
}
if ($lei->{opt}->{thread}) {
for my $ibxish (locals($self)) {
- $self->wq_do('query_thread_mset', [], $ibxish);
+ $self->wq_io_do('query_thread_mset', [], $ibxish);
}
} elsif (locals($self)) {
- $self->wq_do('query_mset', []);
+ $self->wq_io_do('query_mset', []);
}
my $i = 0;
my $q = [];
push @{$q->[$i++ % $MAX_PER_HOST]}, $uri;
}
for my $uris (@$q) {
- $self->wq_do('query_remote_mboxrd', [], $uris);
+ $self->wq_io_do('query_remote_mboxrd', [], $uris);
}
}
$self->SUPER::ipc_atfork_child;
}
-sub query_prepare { # called by wq_do
+sub query_prepare { # called by wq_io_do
my ($self) = @_;
local $0 = "$0 query_prepare";
my $lei = $self->{lei};
delete $lei->{pkt_op_p};
$l2m->wq_close(1) if $l2m;
$lei->event_step_init; # wait for shutdowns
- $self->wq_do('query_prepare', []) if $l2m;
+ $self->wq_io_do('query_prepare', []) if $l2m;
start_query($self, $lei);
$self->wq_close(1); # lei_xsearch workers stop when done
if ($lei->{oneshot}) {
close $agpl or BAIL_OUT "close: $!";
for my $t ('local', 'worker', 'worker again') {
- $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
+ $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
my $i = 0;
for my $fh ($ra, $rb, $rc) {
my $buf = readline($fh);
like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
$i++;
}
- $ipc->wq_do('test_die', [ $wa, $wb, $wc ]);
- $ipc->wq_do('test_sha', [ $wa, $wb ], 'hello world');
+ $ipc->wq_io_do('test_die', [ $wa, $wb, $wc ]);
+ $ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world');
is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)");
{
my $bigger = $big x 10;
- $ipc->wq_do('test_sha', [ $wa, $wb ], $bigger);
+ $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger);
my $exp = sha1_hex($bigger)."\n";
undef $bigger;
is(readline($rb), $exp, "SHA big ($t)");
push(@ppids, $ppid);
}
-# wq_do works across fork (siblings can feed)
+# wq_io_do works across fork (siblings can feed)
SKIP: {
skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
is_deeply(\@ppids, [$$, undef, undef],
my $pid = fork // BAIL_OUT $!;
if ($pid == 0) {
use POSIX qw(_exit);
- $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], $$);
+ $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], $$);
_exit(0);
} else {
my $i = 0;
seek($warn, 0, SEEK_SET) or BAIL_OUT;
my @warn = <$warn>;
is(scalar(@warn), 3, 'warned 3 times');
- like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
+ like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
is($warn[2], $warn[1], 'worker did not die');
my $ios = [];
my $t = timeit(1, sub {
for my $i (1..$nr) {
- $ipc->wq_do('test_set_maybe', $ios, $skv, $i);
- $ipc->wq_do('test_set_maybe', $ios, $skv, $i);
+ $ipc->wq_io_do('test_set_maybe', $ios, $skv, $i);
+ $ipc->wq_io_do('test_set_maybe', $ios, $skv, $i);
}
});
diag "$nr sets done ".timestr($t);
for my $w ($ipc->wq_workers) {
- $ipc->wq_do('test_skv_done', $ios);
+ $ipc->wq_io_do('test_skv_done', $ios);
}
diag "done requested";