# Copyright (C) all contributors # License: AGPL-3.0+ # non-blocking workqueues, currently used by LeiNoteEvent to track renames package PublicInbox::WQBlocked; use v5.12; use parent qw(PublicInbox::DS); use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); use PublicInbox::IPC; use Carp (); use Socket qw(MSG_EOR); sub new { my ($cls, $wq, $buf) = @_; my $self = bless { msgq => [$buf], }, $cls; $wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT); } sub flush_send { my ($self) = @_; push(@{$self->{msgq}}, $_[1]) if defined($_[1]); while (defined(my $buf = shift @{$self->{msgq}})) { if (ref($buf) eq 'CODE') { $buf->($self); # could be \&PublicInbox::DS::close } else { my $wq_s1 = $self->{sock}; my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf, MSG_EOR); next if defined($n); Carp::croak("sendmsg: $!") unless $!{EAGAIN}; PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT); unshift @{$self->{msgq}}, $buf; last; # wait for ->event_step } } } sub enq_close { flush_send($_[0], $_[0]->can('close')) } sub event_step { # called on EPOLLOUT wakeup my ($self) = @_; eval { flush_send($self) } if $self->{sock}; if ($@) { warn $@; $self->close; } } 1;