lib/PublicInbox/View.pm
lib/PublicInbox/ViewDiff.pm
lib/PublicInbox/ViewVCS.pm
+lib/PublicInbox/WQWorker.pm
lib/PublicInbox/WWW.pm
lib/PublicInbox/WWW.pod
lib/PublicInbox/Watch.pm
use PublicInbox::DS qw(dwaitpid);
use PublicInbox::Spawn;
use PublicInbox::OnDestroy;
+use PublicInbox::WQWorker;
use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
use Errno qw(EMSGSIZE);
my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
# for base class, override in sub classes
sub ipc_atfork_prepare {}
+sub wq_atexit_child {}
+
sub ipc_atfork_child {
my ($self) = @_;
my $io = delete($self->{-ipc_atfork_child_close}) or return;
$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
}
-sub _recv_and_run {
+sub recv_and_run {
my ($self, $s2, $len, $full_stream) = @_;
my @fds = $recv_cmd->($s2, my $buf, $len);
- my $n = length($buf // '') or return;
+ return if scalar(@fds) && !defined($fds[0]);
+ my $n = length($buf) or return 0;
my $nfd = 0;
for my $fd (@fds) {
if (open(my $cmdfh, '+<&=', $fd)) {
sub wq_worker_loop ($) {
my ($self) = @_;
- my $len = $self->{wq_req_len} // (4096 * 33);
- my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
- 1 while (_recv_and_run($self, $s2, $len));
+ my $wqw = PublicInbox::WQWorker->new($self);
+ PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+ PublicInbox::DS->EventLoop;
+ PublicInbox::DS->Reset;
}
sub do_sock_stream { # via wq_do, for big requests
my ($self, $len) = @_;
- _recv_and_run($self, delete $self->{0}, $len, 1);
+ recv_and_run($self, delete $self->{0}, $len, 1);
}
sub wq_do { # always async
use PublicInbox::Spawn qw(which spawn popen_rd);
use PublicInbox::LeiDedupe;
use PublicInbox::OnDestroy;
+use PublicInbox::Git;
+use PublicInbox::GitAsyncCat;
use Symbol qw(gensym);
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
use Errno qw(EEXIST ESPIPE ENOENT);
-use PublicInbox::Git;
+
+# struggles with short-lived repos, Gcf2Client makes little sense with lei;
+# but we may use in-process libgit2 in the future.
+$PublicInbox::GitAsyncCat::GCF2C = 0;
my %kw2char = ( # Maildir characters
draft => 'D',
$self->write_cb($lei);
};
my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
- $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
+ git_async_cat($git, $smsg->{blob}, \&git_to_mail,
+ [$wcb, $smsg, $not_done]);
}
-# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
-# ordering is unstable at worker exit and may cause segfaults
-sub reap_gits {
+sub wq_atexit_child {
my ($self) = @_;
delete $self->{wcb};
for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
$git->async_wait_all;
}
-}
-
-sub DESTROY { delete $_[0]->{wcb} }
-
-sub ipc_atfork_child { # runs after IPC::wq_worker_loop
- my ($self) = @_;
- $self->SUPER::ipc_atfork_child;
- # reap_gits needs to run before $self->DESTROY,
- # IPC.pm will ensure that.
- PublicInbox::OnDestroy->new($$, \&reap_gits, $self);
+ $SIG{__WARN__} = 'DEFAULT';
+ $SIG{PIPE} = 'DEFAULT';
}
1;
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for PublicInbox::IPC wq_* (work queue) workers
+package PublicInbox::WQWorker;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET);
+use Errno qw(EAGAIN ECONNRESET);
+use IO::Handle (); # blocking
+
+sub new {
+ my (undef, $wq) = @_;
+ my $s2 = $wq->{-wq_s2} // die 'BUG: no -wq_s2';
+ $s2->blocking(0);
+ my $self = bless { sock => $s2, wq => $wq }, __PACKAGE__;
+ $self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET);
+ $self;
+}
+
+sub event_step {
+ my ($self) = @_;
+ my $n;
+ do {
+ $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33);
+ } while ($n);
+ return if !defined($n) && $! == EAGAIN; # likely
+ warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;
+ $self->{wq}->wq_atexit_child;
+ $self->close; # PublicInbox::DS::close
+}
+
+1;