my @WQ_KEYS = qw(lxs l2m imp mrr cnv p2q tag sol); # internal workers
+sub _drop_wq {
+ my ($self) = @_;
+ for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) { $wq->DESTROY }
+}
+
# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
sub x_it ($$) {
my ($self, $code) = @_;
send($s, "x_it $code", MSG_EOR);
} elsif ($self->{oneshot}) {
# don't want to end up using $? from child processes
- for my $f (@WQ_KEYS) {
- my $wq = delete $self->{$f} or next;
- $wq->DESTROY;
- }
+ _drop_wq($self);
# cleanup anything that has tempfiles or open file handles
%PATH2CFG = ();
delete @$self{qw(ovv dedupe sto cfg)};
sub fail_handler ($;$$) {
my ($lei, $code, $io) = @_;
- for my $f (@WQ_KEYS) {
- my $wq = delete $lei->{$f} or next;
- $wq->wq_wait_old(undef, $lei) if $wq->wq_kill_old; # lei-daemon
- }
close($io) if $io; # needed to avoid warnings on SIGPIPE
+ _drop_wq($lei);
x_it($lei, $code // (1 << 8));
}
sub dclose {
my ($self) = @_;
delete $self->{-progress};
- for my $f (@WQ_KEYS) {
- my $wq = delete $self->{$f} or next;
- if ($wq->wq_kill) {
- $wq->wq_close(0, undef, $self);
- } elsif ($wq->wq_kill_old) {
- $wq->wq_wait_old(undef, $self);
- }
- }
+ _drop_wq($self);
close(delete $self->{1}) if $self->{1}; # may reap_compress
$self->close if $self->{-event_init_done}; # PublicInbox::DS::close
}
return if $wq->{-wq_worker_nr} != 0;
my $lei = $wq->{lei};
my $net = $lei->{net};
- my $mics = $net->imap_common_init($lei);
- my $nn = $net->nntp_common_init($lei);
- pkt_do($lei->{pkt_op_p}, 'net_merge', $net) or
- die "pkt_do net_merge: $!";
- $net->{mics_cached} = $mics if $mics;
- $net->{nn_cached} = $nn if $nn;
+ eval {
+ my $mics = $net->imap_common_init($lei);
+ my $nn = $net->nntp_common_init($lei);
+ pkt_do($lei->{pkt_op_p}, 'net_merge', $net) or
+ die "pkt_do net_merge: $!";
+ $net->{mics_cached} = $mics if $mics;
+ $net->{nn_cached} = $nn if $nn;
+ };
+ $lei->fail($@) if $@;
}
sub net_merge_done1 { # bump merge-count in top-level lei-daemon
$mics->{$sec} //= mic_for($self, "$sec/", $mic_args, $lei);
next unless $self->isa('PublicInbox::NetWriter');
my $dst = $uri->mailbox // next;
- my $mic = $mics->{$sec};
+ my $mic = $mics->{$sec} // die "Unable to continue\n";
next if $mic->exists($dst); # already exists
$mic->create($dst) or die "CREATE $dst failed <$uri>: $@";
}