- my ($class, $io, $cb, $cleanup) = @_;
- my $self = fields::new($class);
- IO::Handle::blocking($io, 0);
- $self->SUPER::new($io);
- $self->{cb} = $cb;
- $self->{cleanup} = $cleanup;
- $self->watch_read(1);
- $self;
+ my ($class, $io, $cb, $arg, $end_obj) = @_;
+
+ # no $io? call $cb at the top of the next event loop to
+ # avoid recursion:
+ unless (defined($io)) {
+ PublicInbox::DS::requeue($cb ? $cb : $arg);
+ die '$end_obj unsupported w/o $io' if $end_obj;
+ return;
+ }
+ my $self = bless {
+ cb => $cb, # initial read callback
+ arg => $arg, # arg for $cb
+ end_obj => $end_obj, # like END{}, can ->event_step
+ }, $class;
+ my $pp = tied *$io; # ProcessPipe
+ $pp->{fh}->blocking(0) // die "$io->blocking(0): $!";
+ $self->SUPER::new($io, EPOLLIN);