my $c = $lse->kw_changed($eml, $kw, my $docids = []);
my $vmd = { kw => $kw };
if (scalar @$docids) { # already in lei/store
- $lei->{sto}->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
+ $lei->{sto}->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c;
} elsif (my $xoids = $lei->{ale}->xoids_for($eml)) {
# it's in an external, only set kw, here
- $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+ $lei->{sto}->wq_do('set_xvmd', $xoids, $eml, $vmd);
} else { # never-before-seen, import the whole thing
# XXX this is critical in protecting against accidental
# data loss without --augment
- $lei->{sto}->ipc_do('set_eml', $eml, $vmd);
+ $lei->{sto}->wq_do('set_eml', $eml, $vmd);
}
}
$lse->xsmsg_vmd($smsg) if $lse;
my $n = _buf2maildir($dst, $bref // \($eml->as_string),
$smsg, $dir);
- $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
+ $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
++$lei->{-nr_write};
}
}
}
# imap_append returns UID if IMAP server has UIDPLUS extension
($sto && $uid =~ /\A[0-9]+\z/) and
- $sto->ipc_do('set_sync_info',
+ $sto->wq_do('set_sync_info',
$smsg->{blob}, $$uri, $uid + 0);
++$lei->{-nr_write};
}
my ($bref, $smsg, $eml) = @_;
$eml //= PublicInbox::Eml->new($bref);
return if $dedupe && $dedupe->is_dup($eml, $smsg);
- $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add
+ $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
++$lei->{-nr_write};
}
}
}
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
- my $v2w = $lei->{v2w} = $ibx->importer;
- $v2w->ipc_lock_init("$dir/ipc.lock");
- $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei });
+ my $v2w = $ibx->importer;
+ $v2w->{-wq_no_bcast} = 1;
+ $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+ $lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
my $al = "$dir/git/0.git/objects/info/alternates";
sub post_augment {
my ($self, $lei, @args) = @_;
my $wait = $lei->{opt}->{'import-before'} ?
- $lei->{sto}->ipc_do('checkpoint', 1) : 0;
+ $lei->{sto}->wq_do('checkpoint', 1) : 0;
# _post_augment_mbox
my $m = $self->can("_post_augment_$self->{base_type}") or return;
$m->($self, $lei, @args);
sub wq_atexit_child {
my ($self) = @_;
+ local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
my $lei = $self->{lei};
delete $self->{wcb};
$lei->{ale}->git->async_wait_all;