]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/PktOp.pm
lei q: emit progress and counting via PktOp
[public-inbox.git] / lib / PublicInbox / PktOp.pm
index d5b95a73d7f57671e862b606d3032adaa9181502..12839e71a8a11d8eabb067d8b0d9dd3c98fac960 100644 (file)
@@ -9,25 +9,16 @@
 package PublicInbox::PktOp;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::DS);
+use parent qw(PublicInbox::DS Exporter);
 use Errno qw(EAGAIN EINTR);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET);
+use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
+our @EXPORT_OK = qw(pkt_do);
 
 sub new {
        my ($cls, $r, $ops, $in_loop) = @_;
        my $self = bless { sock => $r, ops => $ops, re => [] }, $cls;
-       if (ref($ops) eq 'ARRAY') {
-               my %ops;
-               for my $op (@$ops) {
-                       if (ref($op->[0])) {
-                               push @{$self->{re}}, $op;
-                       } else {
-                               $ops{$op->[0]} = $op->[1];
-                       }
-               }
-               $self->{ops} = \%ops;
-       }
        if ($in_loop) { # iff using DS->EventLoop
                $r->blocking(0);
                $self->SUPER::new($r, EPOLLIN|EPOLLET);
@@ -43,6 +34,11 @@ sub pair {
        (new($cls, $c, $ops, $in_loop), $p);
 }
 
+sub pkt_do { # for the producer to trigger event_step in consumer
+       my ($producer, $cmd, @args) = @_;
+       send($producer, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR);
+}
+
 sub close {
        my ($self) = @_;
        my $c = $self->{sock} or return;
@@ -54,24 +50,18 @@ sub event_step {
        my $c = $self->{sock};
        my $msg;
        do {
-               my $n = recv($c, $msg, 128, 0);
+               my $n = recv($c, $msg, 4096, 0);
                unless (defined $n) {
                        return if $! == EAGAIN;
                        next if $! == EINTR;
                        $self->close;
                        die "recv: $!";
                }
-               my $op = $self->{ops}->{$msg};
-               unless ($op) {
-                       for my $re_op (@{$self->{re}}) {
-                               $msg =~ $re_op->[0] or next;
-                               $op = $re_op->[1];
-                               last;
-                       }
-               }
-               die "BUG: unknown message: `$msg'" unless $op;
+               my ($cmd, $pargs) = split(/\0/, $msg, 2);
+               my $op = $self->{ops}->{$cmd // $msg};
+               die "BUG: unknown message: `$cmd'" unless $op;
                my ($sub, @args) = @$op;
-               $sub->(@args);
+               $sub->(@args, $pargs ? ipc_thaw($pargs) : ());
                return $self->close if $msg eq ''; # close on EOF
        } while (1);
 }