}
sub pkt_op_pair {
- my ($self, $ops) = @_;
+ my ($self) = @_;
require PublicInbox::OnDestroy;
require PublicInbox::PktOp;
my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
- @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+ @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair;
$end;
}
($ops ? %$ops : ()),
};
$ops->{''} //= [ \&dclose, $lei ];
- my $end = $lei->pkt_op_pair($ops);
+ my $end = $lei->pkt_op_pair;
$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
delete $lei->{pkt_op_p};
- my $op = delete $lei->{pkt_op_c};
+ my $op_c = delete $lei->{pkt_op_c};
@$end = ();
$lei->event_step_init;
- # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
- $lei->{oneshot} ? $op : undef;
+ ($op_c, $ops);
}
sub _help {
my $lxs = $lei->lxs_prepare or return;
require PublicInbox::SolverGit;
my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
- my $op = $lei->workers_start($self, 'lei_solve', 1,
+ my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1,
{ '' => [ \&sol_done, $lei ] });
$lei->{sol} = $self;
$self->wq_io_do('do_solve_blob', []);
$self->wq_close(1);
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
sub ipc_atfork_child {
my $devfd = $lei->path_to_fd($ovv->{dst}) // return;
$lei->{opt}->{augment} = 1 if $devfd < 0;
$self->prepare_inputs($lei, \@inputs) or return;
- my $op = $lei->workers_start($self, 'lei_convert', 1);
+ my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
$lei->{cnv} = $self;
$self->wq_io_do('do_convert', []);
$self->wq_close(1);
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
sub ipc_atfork_child {
my $ops = { '' => [ \&import_done, $lei ] };
$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
$self->{-wq_nr_workers} = $j // 1; # locked
- my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
+ my ($op_c, undef) = $lei->workers_start($self, 'lei_import', $j, $ops);
$lei->{imp} = $self;
$self->wq_io_do('input_stdin', []) if $self->{0};
net_merge_complete($self) unless $lei->{auth};
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
no warnings 'once';
my $ops = { '' => [ \&mark_done, $lei ] };
$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
$self->{vmd_mod} = $vmd_mod;
- my $op = $lei->workers_start($self, 'lei_mark', 1, $ops);
+ my ($op_c, undef) = $lei->workers_start($self, 'lei_mark', 1, $ops);
$lei->{mark} = $self;
$self->wq_io_do('input_stdin', []) if $self->{0};
net_merge_complete($self) unless $lei->{auth};
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
sub note_missing {
require PublicInbox::Inbox;
require PublicInbox::Admin;
require PublicInbox::InboxWritable;
- my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+ my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, {
'' => [ \&mirror_done, $lei ]
});
$lei->{mrr} = $self;
$self->wq_io_do('do_mirror', []);
$self->wq_close(1);
- while ($op && $op->{sock}) { $op->event_step }
+ $op->op_wait_event($ops);
}
sub ipc_atfork_child {
} else {
$self->{input} = $input;
}
- my $op = $lei->workers_start($self, 'lei_p2q', 1);
+ my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1);
$lei->{p2q} = $self;
$self->wq_io_do('do_p2q', []);
$self->wq_close(1);
- while ($op && $op->{sock}) { $op->event_step }
+ $op->op_wait_event($ops);
}
sub ipc_atfork_child {
'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
};
$lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
- my $end = $lei->pkt_op_pair($ops);
+ my $end = $lei->pkt_op_pair;
$lei->{1}->autoflush(1);
$lei->start_pager if delete $lei->{need_pager};
$lei->{ovv}->ovv_begin($lei);
}
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
- my $op = delete $lei->{pkt_op_c};
+ my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();
$self->{threads} = $lei->{opt}->{threads};
start_query($self);
}
$lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ $op_c->op_wait_event($ops);
}
sub add_uri {
our @EXPORT_OK = qw(pkt_do);
sub new {
- my ($cls, $r, $ops) = @_;
- my $self = bless { sock => $r, ops => $ops }, $cls;
+ my ($cls, $r) = @_;
+ my $self = bless { sock => $r }, $cls;
if ($PublicInbox::DS::in_loop) { # iff using DS->EventLoop
$r->blocking(0);
$self->SUPER::new($r, EPOLLIN|EPOLLET);
+ } else {
+ $self->{blocking} = 1;
}
$self;
}
# returns a blessed object as the consumer, and a GLOB/IO for the producer
sub pair {
- my ($cls, $ops) = @_;
+ my ($cls) = @_;
my ($c, $p);
socketpair($c, $p, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
- (new($cls, $c, $ops), $p);
+ (new($cls, $c), $p);
}
sub pkt_do { # for the producer to trigger event_step in consumer
sub close {
my ($self) = @_;
my $c = $self->{sock} or return;
- $c->blocking ? delete($self->{sock}) : $self->SUPER::close;
+ $self->{blocking} ? delete($self->{sock}) : $self->SUPER::close;
}
sub event_step {
}
}
+# call this when we're ready to wait on events,
+# returns immediately if non-blocking
+sub op_wait_event {
+ my ($self, $ops) = @_;
+ $self->{ops} = $ops;
+ while ($self->{blocking} && $self->{sock}) { event_step($self) }
+}
+
1;