X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiToMail.pm;h=ed609081b0a4c5dde199a0b5f47ce39989c1a652;hb=702864ea3393f4c7a91336b277083bf9fd4b0860;hp=9f7171fb04bbc89110fa7d6eedde534c6419e426;hpb=f3d0e746c6a35c8600b91af99958a52cbc114a4b;p=public-inbox.git diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 9f7171fb..ed609081 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -215,14 +215,14 @@ sub update_kw_maybe ($$$$) { my $c = $lse->kw_changed($eml, $kw, my $docids = []); my $vmd = { kw => $kw }; if (scalar @$docids) { # already in lei/store - $lei->{sto}->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c; + $lei->{sto}->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c; } elsif (my $xoids = $lei->{ale}->xoids_for($eml)) { # it's in an external, only set kw, here - $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd); + $lei->{sto}->wq_do('set_xvmd', $xoids, $eml, $vmd); } else { # never-before-seen, import the whole thing # XXX this is critical in protecting against accidental # data loss without --augment - $lei->{sto}->ipc_do('set_eml', $eml, $vmd); + $lei->{sto}->wq_do('set_eml', $eml, $vmd); } } @@ -296,7 +296,7 @@ sub _maildir_write_cb ($$) { $lse->xsmsg_vmd($smsg) if $lse; my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg, $dir); - $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; + $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; ++$lei->{-nr_write}; } } @@ -326,7 +326,7 @@ sub _imap_write_cb ($$) { } # imap_append returns UID if IMAP server has UIDPLUS extension ($sto && $uid =~ /\A[0-9]+\z/) and - $sto->ipc_do('set_sync_info', + $sto->wq_do('set_sync_info', $smsg->{blob}, $$uri, $uid + 0); ++$lei->{-nr_write}; } @@ -360,7 +360,7 @@ sub _v2_write_cb ($$) { my ($bref, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($bref); return if $dedupe && $dedupe->is_dup($eml, $smsg); - $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add + $lei->{v2w}->wq_do('add', $eml); # V2Writable->add ++$lei->{-nr_write}; } } @@ -658,9 +658,10 @@ sub _pre_augment_v2 { } PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; - my $v2w = $lei->{v2w} = $ibx->importer; - $v2w->ipc_lock_init("$dir/ipc.lock"); - $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei }); + my $v2w = $ibx->importer; + $v2w->{-wq_no_bcast} = 1; + $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); + $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; my $al = "$dir/git/0.git/objects/info/alternates"; @@ -688,8 +689,10 @@ sub do_augment { # slow, runs in wq worker # fast (spawn compressor or mkdir), runs in same process as pre_augment sub post_augment { my ($self, $lei, @args) = @_; + $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ..."); + my $wait = $lei->{opt}->{'import-before'} ? - $lei->{sto}->ipc_do('checkpoint', 1) : 0; + $lei->{sto}->wq_do('checkpoint', 1) : 0; # _post_augment_mbox my $m = $self->can("_post_augment_$self->{base_type}") or return; $m->($self, $lei, @args); @@ -774,6 +777,7 @@ sub write_mail { # via ->wq_io_do sub wq_atexit_child { my ($self) = @_; + local $PublicInbox::DS::in_loop = 0; # waitpid synchronously my $lei = $self->{lei}; delete $self->{wcb}; $lei->{ale}->git->async_wait_all; @@ -782,9 +786,28 @@ sub wq_atexit_child { $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr); } +# runs on a 1s timer in lei-daemon +sub augment_inprogress { + my ($err, $opt, $dst, $au_noted) = @_; + $$au_noted++ and return; + print $err '# '.($opt->{'import-before'} ? + "importing non-external contents of $dst" : ( + ($opt->{dedupe} // 'content') ne 'none') ? + "scanning old contents of $dst for dedupe" : + "removing old contents of $dst")." ...\n"; +} + # called in top-level lei-daemon when LeiAuth is done sub net_merge_all_done { - my ($self) = @_; + my ($self, $lei) = @_; + if ($PublicInbox::DS::in_loop && + $self->can("_do_augment_$self->{base_type}") && + !$lei->{opt}->{quiet}) { + $self->{-au_noted} = 0; + PublicInbox::DS::add_timer(1, \&augment_inprogress, + $lei->{2}, $lei->{opt}, + $self->{dst}, \$self->{-au_noted}); + } $self->wq_broadcast('do_post_auth'); $self->wq_close(1); }