package PublicInbox::IPC;
use strict;
use v5.10.1;
+use parent qw(Exporter);
use Carp qw(confess croak);
use PublicInbox::DS qw(dwaitpid);
use PublicInbox::Spawn;
use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
+our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
my $WQ_MAX_WORKERS = 4096;
my ($enc, $dec);
# ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
};
if ($enc && $dec) { # should be custom ops
- *freeze = sub ($) { sereal_encode_with_object $enc, $_[0] };
- *thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret };
+ *ipc_freeze = sub ($) { sereal_encode_with_object $enc, $_[0] };
+ *ipc_thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret };
} else {
eval { # some distros have Storable as a separate package from Perl
require Storable;
- Storable->import(qw(freeze thaw));
+ *ipc_freeze = \&Storable::freeze;
+ *ipc_thaw = \&Storable::thaw;
$enc = 1;
} // warn("Storable (part of Perl) missing: $@\n");
}
chop($len) eq "\n" or croak "no LF byte in $len";
defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
$n == $len or croak "short read: $n != $len";
- thaw($buf);
+ ipc_thaw($buf);
}
sub _pack_rec ($) {
my ($ref) = @_;
- my $buf = freeze($ref);
+ my $buf = ipc_freeze($ref);
length($buf) . "\n" . $buf;
}
$n = length($buf);
}
# Sereal dies on truncated data, Storable returns undef
- my $args = thaw($buf) // die "thaw error on buffer of size: $n";
+ my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n";
undef $buf;
my $sub = shift @$args;
eval { $self->$sub(@$args) };
my ($self, $sub, $ios, @args) = @_;
if (my $s1 = $self->{-wq_s1}) { # run in worker
my $fds = [ map { fileno($_) } @$ios ];
- my $n = $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR);
+ my $buf = ipc_freeze([$sub, @args]);
+ my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
return if defined($n); # likely
croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
croak "sendmsg: $!" if !$!{EMSGSIZE};
socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
croak "socketpair: $!";
- my $buf = freeze([$sub, @args]);
$n = $send_cmd->($s1, [ fileno($r) ],
- freeze(['do_sock_stream', length($buf)]),
+ ipc_freeze(['do_sock_stream', length($buf)]),
MSG_EOR) // croak "sendmsg: $!";
undef $r;
$n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
}
# Sereal doesn't have dclone
-sub deep_clone { thaw(freeze($_[-1])) }
+sub deep_clone { ipc_thaw(ipc_freeze($_[-1])) }
1;
'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
sort|s=s reverse|r offset=i remote! local! external! pretty
- mua-cmd|mua=s no-torsocks torsocks=s verbose|v
+ mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q
received-after=s received-before=s sent-after=s sent-since=s),
PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ],
use strict;
use v5.10.1;
use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::PktOp;
+use PublicInbox::DS qw(dwaitpid now);
+use PublicInbox::PktOp qw(pkt_do);
use PublicInbox::Import;
use File::Temp 0.19 (); # 0.19 for ->newdir
use File::Spec ();
-use Socket qw(MSG_EOR);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::Spawn qw(popen_rd spawn which);
use PublicInbox::MID qw(mids);
sub _mset_more ($$) {
my ($mset, $mo) = @_;
my $size = $mset->size;
- $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+ $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
}
# $startq will EOF when query_prepare is done augmenting and allow
my $startq = delete $lei->{startq};
my ($srch, $over) = ($ibxish->search, $ibxish->over);
- unless ($srch && $over) {
- my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
- warn "$desc not indexed by Xapian\n";
- return;
- }
+ my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+ return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
my $mo = { %{$lei->{mset_opt}} };
my $mset;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
do {
$mset = $srch->mset($mo->{qstr}, $mo);
+ pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+ $mset->get_matches_estimated);
my $ids = $srch->mset_to_artnums($mset, $mo);
my $ctx = { ids => $ids };
my $i = 0;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
do {
$mset = $self->mset($mo->{qstr}, $mo);
+ pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+ $mset->size, $mset->get_matches_estimated);
for my $mitem ($mset->items) {
my $smsg = smsg_for($self, $mitem) or next;
wait_startq($startq) if $startq;
$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
delete @$smsg{qw(From Subject -ds -ts)};
if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+ ++$lei->{-nr_remote_eml};
+ if (!$lei->{opt}->{quiet}) {
+ my $now = now();
+ my $next = $lei->{-next_progress} //= ($now + 1);
+ if ($now > $next) {
+ $lei->{-next_progress} = $now + 1;
+ my $nr = $lei->{-nr_remote_eml};
+ $lei->err("# $lei->{-current_url} $nr/?");
+ }
+ }
$each_smsg->($smsg, undef, $eml);
}
my $tor = $opt->{torsocks} //= 'auto';
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
for my $uri (@$uris) {
+ $lei->{-current_url} = $uri->as_string;
+ $lei->{-nr_remote_eml} = 0;
$uri->query_form(@qform);
my $cmd = [ @cmd, $uri->as_string ];
if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
$lei, $each_smsg);
};
return $lei->fail("E: @$cmd: $@") if $@;
- next unless $?;
+ if ($? == 0) {
+ my $nr = $lei->{-nr_remote_eml};
+ pkt_do($lei->{pkt_op}, 'mset_progress',
+ $lei->{-current_url}, $nr, $nr);
+ next;
+ }
seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
my $e = do { local $/; <$cerr> } //
die "read(curl stderr): $!\n";
}
$lei->start_mua;
}
+ $lei->{opt}->{quiet} or
+ $lei->err('# ', $lei->{-mset_total} // 0, " matches");
$lei->dclose;
}
+sub mset_progress { # called via pkt_op/pkt_do from workers
+ my ($lei, $pargs) = @_;
+ my ($desc, $mset_size, $mset_total_est) = @$pargs;
+ return if $lei->{opt}->{quiet};
+ $lei->{-mset_total} += $mset_size;
+ $lei->err("# $desc $mset_size/$mset_total_est");
+}
+
sub do_post_augment {
my ($lei, $zpipe, $au_done) = @_;
my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
delete $lei->{l2m}->{-wq_s1};
eval { $lei->{l2m}->do_augment($lei) };
$lei->fail($@) if $@;
- send($lei->{pkt_op}, '.', MSG_EOR) == 1 or
- die "do_post_augment trigger: $!"
+ pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
}
sub fail_handler ($;$$) {
'!' => [ \&fail_handler, $lei ],
'.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
'' => [ \&query_done, $lei ],
+ 'mset_progress' => [ \&mset_progress, $lei ],
};
(my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
package PublicInbox::PktOp;
use strict;
use v5.10.1;
-use parent qw(PublicInbox::DS);
+use parent qw(PublicInbox::DS Exporter);
use Errno qw(EAGAIN EINTR);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET);
+use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
+our @EXPORT_OK = qw(pkt_do);
sub new {
my ($cls, $r, $ops, $in_loop) = @_;
my $self = bless { sock => $r, ops => $ops, re => [] }, $cls;
- if (ref($ops) eq 'ARRAY') {
- my %ops;
- for my $op (@$ops) {
- if (ref($op->[0])) {
- push @{$self->{re}}, $op;
- } else {
- $ops{$op->[0]} = $op->[1];
- }
- }
- $self->{ops} = \%ops;
- }
if ($in_loop) { # iff using DS->EventLoop
$r->blocking(0);
$self->SUPER::new($r, EPOLLIN|EPOLLET);
(new($cls, $c, $ops, $in_loop), $p);
}
+sub pkt_do { # for the producer to trigger event_step in consumer
+ my ($producer, $cmd, @args) = @_;
+ send($producer, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR);
+}
+
sub close {
my ($self) = @_;
my $c = $self->{sock} or return;
my $c = $self->{sock};
my $msg;
do {
- my $n = recv($c, $msg, 128, 0);
+ my $n = recv($c, $msg, 4096, 0);
unless (defined $n) {
return if $! == EAGAIN;
next if $! == EINTR;
$self->close;
die "recv: $!";
}
- my $op = $self->{ops}->{$msg};
- unless ($op) {
- for my $re_op (@{$self->{re}}) {
- $msg =~ $re_op->[0] or next;
- $op = $re_op->[1];
- last;
- }
- }
- die "BUG: unknown message: `$msg'" unless $op;
+ my ($cmd, $pargs) = split(/\0/, $msg, 2);
+ my $op = $self->{ops}->{$cmd // $msg};
+ die "BUG: unknown message: `$cmd'" unless $op;
my ($sub, @args) = @$op;
- $sub->(@args);
+ $sub->(@args, $pargs ? ipc_thaw($pargs) : ());
return $self->close if $msg eq ''; # close on EOF
} while (1);
}
}
$lei->('add-external', $url);
my $mid = '20140421094015.GA8962@dcvr.yhbt.net';
- ok($lei->('q', "m:$mid"), "query $url");
+ ok($lei->('q', '-q', "m:$mid"), "query $url");
is($err, '', "no errors on $url");
my $res = $json->decode($out);
is($res->[0]->{'m'}, "<$mid>", "got expected mid from $url");
- ok($lei->('q', "m:$mid", 'd:..20101002'), 'no results, no error');
+ ok($lei->('q', '-q', "m:$mid", 'd:..20101002'), 'no results, no error');
is($err, '', 'no output on 404, matching local FS behavior');
is($out, "[null]\n", 'got null results');
$lei->('forget-external', $url);
my @s = grep(/^Subject:/, $cat->());
is(scalar(@s), 1, "1 result in mbox$sfx");
$lei->('q', '-a', '-o', "mboxcl2:$f", 's:see attachment');
- is($err, '', 'no errors from augment');
+ is(grep(!/^#/, $err), 0, 'no errors from augment');
@s = grep(/^Subject:/, my @wtf = $cat->());
is(scalar(@s), 2, "2 results in mbox$sfx");
$lei->('q', '-a', '-o', "mboxcl2:$f", 's:nonexistent');
- is($err, '', "no errors on no results ($sfx)");
+ is(grep(!/^#/, $err), 0, "no errors on no results ($sfx)");
my @s2 = grep(/^Subject:/, $cat->());
is_deeply(\@s2, \@s,
pipe(my ($r, $w)) or BAIL_OUT $!;
open my $err, '+>', undef or BAIL_OUT $!;
my $opt = { run_mode => 0, 1 => $w, 2 => $err };
- my $cmd = [qw(lei q -t), @$out, 'bytes:1..'];
+ my $cmd = [qw(lei q -q -t), @$out, 'bytes:1..'];
my $tp = start_script($cmd, $env, $opt);
close $w;
sysread($r, my $buf, 1);