X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiPmdir.pm;h=d4aa0212bea7dfb4ce3766cf13eb20c040eb0ecc;hb=7e4bd71efcf3bf61216ec1a7577e33be5f95b8a9;hp=5efb012e4e8c69ad146f52d59a862999e29da83e;hpb=10b523eb017162240b1ac3647f8dcbbf2be348a7;p=public-inbox.git diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm index 5efb012e..d4aa0212 100644 --- a/lib/PublicInbox/LeiPmdir.pm +++ b/lib/PublicInbox/LeiPmdir.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # WQ worker for dealing with parallel Maildir reads; @@ -13,55 +13,42 @@ use parent qw(PublicInbox::IPC); sub new { my ($cls, $lei, $ipt) = @_; my $self = bless { -wq_ident => 'lei Maildir worker' }, $cls; - my $jobs = $lei->{opt}->{jobs}; + my $jobs = $lei->{opt}->{jobs} // ''; $jobs =~ /\A[0-9]+,([0-9]+)\z/ and $jobs = $1; - my $nproc = $jobs // do { - # untested with >=4 CPUs, though I suspect I/O latency + my $nproc = $jobs || do { + # barely tested with >=4 CPUs, though I suspect I/O latency # of SATA SSD storage will make >=4 processes unnecessary, # here. NVMe users may wish to use '-j' my $n = $self->detect_nproc; - $n = 4 if $n > 4; + $n = $n > 4 ? 4 : $n; }; my ($op_c, $ops) = $lei->workers_start($self, $nproc, undef, { ipt => $ipt }); # LeiInput subclass $op_c->{ops} = $ops; # for PktOp->event_step + $self->{lei_sock} = $lei->{sock}; # keep client for pmd_done_wait $lei->{pmd} = $self; } sub ipc_atfork_child { my ($self) = @_; - my $lei = $self->{lei}; - $lei->_lei_atfork_child; my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}'; - $ipt->{lei} = $lei; - $ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}'; - $ipt->{lse} = $ipt->{sto}->search; - $ipt->{over} = $ipt->{lse}->over; - $ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0' - $self->SUPER::ipc_atfork_child; + my $lei = $ipt->{lei} = $self->{lei}; + delete @$lei{qw(auth net)}; # no network access in this worker + $ipt->ipc_atfork_child; # calls _lei_atfork_child; } sub each_mdir_fn { # maildir_each_file callback - my ($f, $self, @args) = @_; - $self->wq_io_do('mdir_iter', [], $f, @args); + my ($f, $fl, $self, @args) = @_; + $self->wq_io_do('mdir_iter', [], $f, $fl, @args); } sub mdir_iter { # via wq_io_do - my ($self, $f, @args) = @_; - $self->{ipt}->pmdir_cb($f, @args); -} - -sub pmd_done_wait { - my ($arg, $pid) = @_; - my ($self, $lei) = @$arg; - my $wait = $lei->{sto}->ipc_do('done'); - $lei->can('wq_done_wait')->($arg, $pid); + my ($self, $f, $fl, @args) = @_; + $self->{ipt}->pmdir_cb($f, $fl, @args); } sub _lei_wq_eof { # EOF callback for main lei daemon - my ($lei) = @_; - my $pmd = delete $lei->{pmd} or return $lei->fail; - $pmd->wq_wait_old(\&pmd_done_wait, $lei); + $_[0]->wq_eof('pmd'); } 1;