X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FPktOp.pm;h=12839e71a8a11d8eabb067d8b0d9dd3c98fac960;hb=7349713101700e488231ad9ffece8ee42de0928c;hp=d5b95a73d7f57671e862b606d3032adaa9181502;hpb=a9ba3d10482daea78739f3da6c8d7739ac8af3fc;p=public-inbox.git diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index d5b95a73..12839e71 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -9,25 +9,16 @@ package PublicInbox::PktOp; use strict; use v5.10.1; -use parent qw(PublicInbox::DS); +use parent qw(PublicInbox::DS Exporter); use Errno qw(EAGAIN EINTR); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET); +use PublicInbox::IPC qw(ipc_freeze ipc_thaw); +our @EXPORT_OK = qw(pkt_do); sub new { my ($cls, $r, $ops, $in_loop) = @_; my $self = bless { sock => $r, ops => $ops, re => [] }, $cls; - if (ref($ops) eq 'ARRAY') { - my %ops; - for my $op (@$ops) { - if (ref($op->[0])) { - push @{$self->{re}}, $op; - } else { - $ops{$op->[0]} = $op->[1]; - } - } - $self->{ops} = \%ops; - } if ($in_loop) { # iff using DS->EventLoop $r->blocking(0); $self->SUPER::new($r, EPOLLIN|EPOLLET); @@ -43,6 +34,11 @@ sub pair { (new($cls, $c, $ops, $in_loop), $p); } +sub pkt_do { # for the producer to trigger event_step in consumer + my ($producer, $cmd, @args) = @_; + send($producer, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR); +} + sub close { my ($self) = @_; my $c = $self->{sock} or return; @@ -54,24 +50,18 @@ sub event_step { my $c = $self->{sock}; my $msg; do { - my $n = recv($c, $msg, 128, 0); + my $n = recv($c, $msg, 4096, 0); unless (defined $n) { return if $! == EAGAIN; next if $! == EINTR; $self->close; die "recv: $!"; } - my $op = $self->{ops}->{$msg}; - unless ($op) { - for my $re_op (@{$self->{re}}) { - $msg =~ $re_op->[0] or next; - $op = $re_op->[1]; - last; - } - } - die "BUG: unknown message: `$msg'" unless $op; + my ($cmd, $pargs) = split(/\0/, $msg, 2); + my $op = $self->{ops}->{$cmd // $msg}; + die "BUG: unknown message: `$cmd'" unless $op; my ($sub, @args) = @$op; - $sub->(@args); + $sub->(@args, $pargs ? ipc_thaw($pargs) : ()); return $self->close if $msg eq ''; # close on EOF } while (1); }