]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/WQBlocked.pm
imap+nntp: share COMPRESS implementation
[public-inbox.git] / lib / PublicInbox / WQBlocked.pm
1 # Copyright (C) all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # non-blocking workqueues, currently used by LeiNoteEvent to track renames
5 package PublicInbox::WQBlocked;
6 use v5.12;
7 use parent qw(PublicInbox::DS);
8 use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
9 use PublicInbox::IPC;
10 use Carp ();
11 use Socket qw(MSG_EOR);
12
13 sub new {
14         my ($cls, $wq, $buf) = @_;
15         my $self = bless { msgq => [$buf], }, $cls;
16         $wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT);
17 }
18
19 sub flush_send {
20         my ($self) = @_;
21         push(@{$self->{msgq}}, $_[1]) if defined($_[1]);
22         while (defined(my $buf = shift @{$self->{msgq}})) {
23                 if (ref($buf) eq 'CODE') {
24                         $buf->($self); # could be \&PublicInbox::DS::close
25                 } else {
26                         my $wq_s1 = $self->{sock};
27                         my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
28                                                                 MSG_EOR);
29                         next if defined($n);
30                         Carp::croak("sendmsg: $!") unless $!{EAGAIN};
31                         PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
32                         unshift @{$self->{msgq}}, $buf;
33                         last; # wait for ->event_step
34                 }
35         }
36 }
37
38 sub enq_close { flush_send($_[0], $_[0]->can('close')) }
39
40 sub event_step { # called on EPOLLOUT wakeup
41         my ($self) = @_;
42         eval { flush_send($self) } if $self->{sock};
43         if ($@) {
44                 warn $@;
45                 $self->close;
46         }
47 }
48
49 1;