1 # Copyright (C) all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
4 # non-blocking workqueues, currently used by LeiNoteEvent to track renames
5 package PublicInbox::WQBlocked;
7 use parent qw(PublicInbox::DS);
8 use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
11 use Socket qw(MSG_EOR);
14 my ($cls, $wq, $buf) = @_;
15 my $self = bless { msgq => [$buf], }, $cls;
16 $wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT);
21 push(@{$self->{msgq}}, $_[1]) if defined($_[1]);
22 while (defined(my $buf = shift @{$self->{msgq}})) {
23 if (ref($buf) eq 'CODE') {
24 $buf->($self); # could be \&PublicInbox::DS::close
26 my $wq_s1 = $self->{sock};
27 my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
30 Carp::croak("sendmsg: $!") unless $!{EAGAIN};
31 PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
32 unshift @{$self->{msgq}}, $buf;
33 last; # wait for ->event_step
38 sub enq_close { flush_send($_[0], $_[0]->can('close')) }
40 sub event_step { # called on EPOLLOUT wakeup
42 eval { flush_send($self) } if $self->{sock};