# Copyright (C) all contributors
# License: AGPL-3.0+
# non-blocking workqueues, currently used by LeiNoteEvent to track renames
package PublicInbox::WQBlocked;
use v5.12;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
use PublicInbox::IPC;
use Carp ();
use Socket qw(MSG_EOR);
sub new {
my ($cls, $wq, $buf) = @_;
my $self = bless { msgq => [$buf], }, $cls;
$wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT);
}
sub flush_send {
my ($self) = @_;
push(@{$self->{msgq}}, $_[1]) if defined($_[1]);
while (defined(my $buf = shift @{$self->{msgq}})) {
if (ref($buf) eq 'CODE') {
$buf->($self); # could be \&PublicInbox::DS::close
} else {
my $wq_s1 = $self->{sock};
my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
MSG_EOR);
next if defined($n);
Carp::croak("sendmsg: $!") unless $!{EAGAIN};
PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
unshift @{$self->{msgq}}, $buf;
last; # wait for ->event_step
}
}
}
sub enq_close { flush_send($_[0], $_[0]->can('close')) }
sub event_step { # called on EPOLLOUT wakeup
my ($self) = @_;
eval { flush_send($self) } if $self->{sock};
if ($@) {
warn $@;
$self->close;
}
}
1;