# Sereal doesn't have dclone
sub deep_clone { ipc_thaw(ipc_freeze($_[-1])) }
+sub detect_nproc () {
+ # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl
+ return POSIX::sysconf(84) if $^O eq 'linux';
+ return POSIX::sysconf(58) if $^O eq 'freebsd';
+ # TODO: more OSes
+
+ # getconf(1) is POSIX, but *NPROCESSORS* vars are not
+ for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) {
+ `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1;
+ }
+ for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
+ `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
+ }
+
+ # should we bother with `sysctl hw.ncpu`? Those only give
+ # us total processor count, not online processor count.
+ undef
+}
+
1;
'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
sort|s=s reverse|r offset=i remote! local! external! pretty
- include|I=s@ exclude=s@ only=s@
+ include|I=s@ exclude=s@ only=s@ jobs|j=s
mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q
received-after=s received-before=s sent-after=s sent-since=s),
PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ],
'q only=s@' => [ 'URL_OR_PATHNAME',
'only use specified external(s) for search' ],
+'q jobs=s' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
+ 'control number of search and writer jobs' ],
+
'ls-query format|f=s' => $ls_format,
'ls-external format|f=s' => $ls_format,
my ($self, @argv) = @_;
require PublicInbox::LeiXSearch;
require PublicInbox::LeiOverview;
+ require PublicInbox::V2Writable;
PublicInbox::Config->json; # preload before forking
my $opt = $self->{opt};
# prepare any number of LeiXSearch || LeiSearch || Inbox || URL
unless ($lxs->locals || $lxs->remotes) {
return $self->fail('no local or remote inboxes to search');
}
- my $xj = $lxs->concurrency($opt);
+ my ($xj, $mj) = split(/,/, $opt->{jobs} // '');
+ if (defined($xj) && $xj ne '' && $xj !~ /\A[1-9][0-9]*\z/) {
+ return $self->fail("`$xj' search jobs must be >= 1");
+ }
+ $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY"
+ my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists
+ $xj = $nproc if $xj > $nproc;
PublicInbox::LeiOverview->new($self) or return;
$self->atfork_prepare_wq($lxs);
$lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset);
delete $lxs->{-ipc_atfork_child_close};
if (my $l2m = $self->{l2m}) {
- my $mj = 4; # TODO: configurable
+ if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) {
+ return $self->fail("`$mj' writer jobs must be >= 1");
+ }
+ $mj //= $nproc;
$self->atfork_prepare_wq($l2m);
$l2m->wq_workers_start('lei2mail', $mj, $self->oldset);
delete $l2m->{-ipc_atfork_child_close};
}
my $MAX_PER_HOST = 4;
-sub MAX_PER_HOST { $MAX_PER_HOST }
sub concurrency {
my ($self, $opt) = @_;
use v5.10.1;
use parent qw(PublicInbox::Lock);
use PublicInbox::SearchIdxShard;
+use PublicInbox::IPC;
use PublicInbox::Eml;
use PublicInbox::Git;
use PublicInbox::Import;
# to increase Xapian shards
our $NPROC_MAX_DEFAULT = 4;
-sub detect_nproc () {
- # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl
- return POSIX::sysconf(84) if $^O eq 'linux';
- return POSIX::sysconf(58) if $^O eq 'freebsd';
- # TODO: more OSes
-
- # getconf(1) is POSIX, but *NPROCESSORS* vars are not
- for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) {
- `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1;
- }
- for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
- `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
- }
-
- # should we bother with `sysctl hw.ncpu`? Those only give
- # us total processor count, not online processor count.
- undef
-}
-
sub nproc_shards ($) {
my ($creat_opt) = @_;
my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH';
$n //= $ENV{NPROC};
if (!$n) {
# assume 2 cores if not detectable or zero
- state $NPROC_DETECTED = detect_nproc() || 2;
+ state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2;
$n = $NPROC_DETECTED;
$n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT;
}