]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei q: support --jobs [SEARCHERS],[WRITERS]
authorEric Wong <e@80x24.org>
Tue, 2 Feb 2021 11:47:02 +0000 (11:47 +0000)
committerEric Wong <e@80x24.org>
Wed, 3 Feb 2021 07:32:45 +0000 (07:32 +0000)
This comma-delimited parameter allows controlling the number or
lei_xsearch and lei2mail worker processes.  With the change
to make IPC wq_* work use the event loop, it's now safe to
run fewer worker processes for searching with no risk of
deadlocks.

MAX_PER_HOST isn't configurable yet for remote hosts,
and maybe it shouldn't be due to potential for abuse.

lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/V2Writable.pm

index 50de1bed22d71ba4ac72cc3a6f781368907881c3..3873649b6bb5abcaad05309285eea05cc87c8f32 100644 (file)
@@ -466,4 +466,23 @@ sub DESTROY {
 # Sereal doesn't have dclone
 sub deep_clone { ipc_thaw(ipc_freeze($_[-1])) }
 
+sub detect_nproc () {
+       # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl
+       return POSIX::sysconf(84) if $^O eq 'linux';
+       return POSIX::sysconf(58) if $^O eq 'freebsd';
+       # TODO: more OSes
+
+       # getconf(1) is POSIX, but *NPROCESSORS* vars are not
+       for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) {
+               `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1;
+       }
+       for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
+               `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
+       }
+
+       # should we bother with `sysctl hw.ncpu`?  Those only give
+       # us total processor count, not online processor count.
+       undef
+}
+
 1;
index 2c512c5eb545084f902aa221e8af6808cd6c9340..9afc90cfaa1067d7a0b852debf8a0bf6575da9ed 100644 (file)
@@ -104,7 +104,7 @@ our %CMD = ( # sorted in order of importance/use:
 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
        save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
        sort|s=s reverse|r offset=i remote! local! external! pretty
-       include|I=s@ exclude=s@ only=s@
+       include|I=s@ exclude=s@ only=s@ jobs|j=s
        mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q
        received-after=s received-before=s sent-after=s sent-since=s),
        PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ],
@@ -236,6 +236,9 @@ my %OPTDESC = (
 'q     only=s@' => [ 'URL_OR_PATHNAME',
                'only use specified external(s) for search' ],
 
+'q     jobs=s' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
+               'control number of search and writer jobs' ],
+
 'ls-query      format|f=s' => $ls_format,
 'ls-external   format|f=s' => $ls_format,
 
index ca214ca19230c149b1b5b49833cd6283bf8e5e59..72a67c24f18dfc89d2a2dee32eb0d86afafece81 100644 (file)
@@ -17,6 +17,7 @@ sub lei_q {
        my ($self, @argv) = @_;
        require PublicInbox::LeiXSearch;
        require PublicInbox::LeiOverview;
+       require PublicInbox::V2Writable;
        PublicInbox::Config->json; # preload before forking
        my $opt = $self->{opt};
        # prepare any number of LeiXSearch || LeiSearch || Inbox || URL
@@ -53,13 +54,22 @@ sub lei_q {
        unless ($lxs->locals || $lxs->remotes) {
                return $self->fail('no local or remote inboxes to search');
        }
-       my $xj = $lxs->concurrency($opt);
+       my ($xj, $mj) = split(/,/, $opt->{jobs} // '');
+       if (defined($xj) && $xj ne '' && $xj !~ /\A[1-9][0-9]*\z/) {
+               return $self->fail("`$xj' search jobs must be >= 1");
+       }
+       $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY"
+       my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists
+       $xj = $nproc if $xj > $nproc;
        PublicInbox::LeiOverview->new($self) or return;
        $self->atfork_prepare_wq($lxs);
        $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset);
        delete $lxs->{-ipc_atfork_child_close};
        if (my $l2m = $self->{l2m}) {
-               my $mj = 4; # TODO: configurable
+               if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) {
+                       return $self->fail("`$mj' writer jobs must be >= 1");
+               }
+               $mj //= $nproc;
                $self->atfork_prepare_wq($l2m);
                $l2m->wq_workers_start('lei2mail', $mj, $self->oldset);
                delete $l2m->{-ipc_atfork_child_close};
index 57a18075b28420536869c1c59d80be107c058ec0..37bd233e90106221ca17a94e24e2f4c46f335180 100644 (file)
@@ -350,7 +350,6 @@ sub do_post_augment {
 }
 
 my $MAX_PER_HOST = 4;
-sub MAX_PER_HOST { $MAX_PER_HOST }
 
 sub concurrency {
        my ($self, $opt) = @_;
index 35b7fe3049cf0943834141068e1f2815495592f5..cbd4f003c6581d7121ead99be8b122e05ed25a0c 100644 (file)
@@ -8,6 +8,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::Lock);
 use PublicInbox::SearchIdxShard;
+use PublicInbox::IPC;
 use PublicInbox::Eml;
 use PublicInbox::Git;
 use PublicInbox::Import;
@@ -35,32 +36,13 @@ our $PACKING_FACTOR = 0.4;
 # to increase Xapian shards
 our $NPROC_MAX_DEFAULT = 4;
 
-sub detect_nproc () {
-       # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl
-       return POSIX::sysconf(84) if $^O eq 'linux';
-       return POSIX::sysconf(58) if $^O eq 'freebsd';
-       # TODO: more OSes
-
-       # getconf(1) is POSIX, but *NPROCESSORS* vars are not
-       for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) {
-               `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1;
-       }
-       for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
-               `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
-       }
-
-       # should we bother with `sysctl hw.ncpu`?  Those only give
-       # us total processor count, not online processor count.
-       undef
-}
-
 sub nproc_shards ($) {
        my ($creat_opt) = @_;
        my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH';
        $n //= $ENV{NPROC};
        if (!$n) {
                # assume 2 cores if not detectable or zero
-               state $NPROC_DETECTED = detect_nproc() || 2;
+               state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2;
                $n = $NPROC_DETECTED;
                $n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT;
        }