X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiToMail.pm;h=9c748deaed161db7b9e2b02674a521fcc6ce5e0a;hb=4b5a1b5787edee2a3b6cc10a3ccc5721f1414268;hp=9f7171fb04bbc89110fa7d6eedde534c6419e426;hpb=f3d0e746c6a35c8600b91af99958a52cbc114a4b;p=public-inbox.git diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 9f7171fb..9c748dea 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -157,7 +157,7 @@ sub _post_augment_mbox { # open a compressor process from top-level process my $zsfx = $self->{zsfx} or return; my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei); my ($r, $w) = @{delete $lei->{zpipe}}; - my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} }; + my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 }; my $pid = spawn($cmd, undef, $rdr); my $pp = gensym; my $dup = bless { "pid.$pid" => $cmd }, ref($lei); @@ -215,14 +215,14 @@ sub update_kw_maybe ($$$$) { my $c = $lse->kw_changed($eml, $kw, my $docids = []); my $vmd = { kw => $kw }; if (scalar @$docids) { # already in lei/store - $lei->{sto}->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c; + $lei->{sto}->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c; } elsif (my $xoids = $lei->{ale}->xoids_for($eml)) { # it's in an external, only set kw, here - $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd); + $lei->{sto}->wq_do('set_xvmd', $xoids, $eml, $vmd); } else { # never-before-seen, import the whole thing # XXX this is critical in protecting against accidental # data loss without --augment - $lei->{sto}->ipc_do('set_eml', $eml, $vmd); + $lei->{sto}->wq_do('set_eml', $eml, $vmd); } } @@ -296,7 +296,7 @@ sub _maildir_write_cb ($$) { $lse->xsmsg_vmd($smsg) if $lse; my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg, $dir); - $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; + $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; ++$lei->{-nr_write}; } } @@ -326,7 +326,7 @@ sub _imap_write_cb ($$) { } # imap_append returns UID if IMAP server has UIDPLUS extension ($sto && $uid =~ /\A[0-9]+\z/) and - $sto->ipc_do('set_sync_info', + $sto->wq_do('set_sync_info', $smsg->{blob}, $$uri, $uid + 0); ++$lei->{-nr_write}; } @@ -360,7 +360,7 @@ sub _v2_write_cb ($$) { my ($bref, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($bref); return if $dedupe && $dedupe->is_dup($eml, $smsg); - $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add + $lei->{v2w}->wq_do('add', $eml); # V2Writable->add ++$lei->{-nr_write}; } } @@ -658,9 +658,10 @@ sub _pre_augment_v2 { } PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; - my $v2w = $lei->{v2w} = $ibx->importer; - $v2w->ipc_lock_init("$dir/ipc.lock"); - $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei }); + my $v2w = $ibx->importer; + $v2w->{-wq_no_bcast} = 1; + $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); + $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; my $al = "$dir/git/0.git/objects/info/alternates"; @@ -688,8 +689,10 @@ sub do_augment { # slow, runs in wq worker # fast (spawn compressor or mkdir), runs in same process as pre_augment sub post_augment { my ($self, $lei, @args) = @_; + $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ..."); + my $wait = $lei->{opt}->{'import-before'} ? - $lei->{sto}->ipc_do('checkpoint', 1) : 0; + $lei->{sto}->wq_do('checkpoint', 1) : 0; # _post_augment_mbox my $m = $self->can("_post_augment_$self->{base_type}") or return; $m->($self, $lei, @args); @@ -711,16 +714,15 @@ sub do_post_auth { } else { # Maildir $self->{shard_info} = [ $mod, $shard ]; } - $aug = '+'; # incr_post_augment + $aug = 'incr_post_augment'; } elsif ($self->{-wq_worker_nr} == 0) { # 1st worker do_augment - $aug = '.'; # do_post_augment + $aug = 'do_post_augment'; } if ($aug) { local $0 = 'do_augment'; eval { do_augment($self, $lei) }; $lei->fail($@) if $@; - $lei->{pkt_op_p}->pkt_do($aug) == 1 or - die "do_post_augment trigger: $!"; + $lei->{pkt_op_p}->pkt_do($aug) or die "pkt_do($aug): $!"; } # done augmenting, connect the compressor pipe for each worker if (my $zpipe = delete $lei->{zpipe}) { @@ -774,6 +776,7 @@ sub write_mail { # via ->wq_io_do sub wq_atexit_child { my ($self) = @_; + local $PublicInbox::DS::in_loop = 0; # waitpid synchronously my $lei = $self->{lei}; delete $self->{wcb}; $lei->{ale}->git->async_wait_all; @@ -782,9 +785,31 @@ sub wq_atexit_child { $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr); } +# runs on a 1s timer in lei-daemon +sub augment_inprogress { + my ($err, $opt, $dst, $au_noted) = @_; + eval { + return if $$au_noted++ || !$err || !defined(fileno($err)); + print $err '# '.($opt->{'import-before'} ? + "importing non-external contents of $dst" : ( + ($opt->{dedupe} // 'content') ne 'none') ? + "scanning old contents of $dst for dedupe" : + "removing old contents of $dst")." ...\n"; + }; + warn "E: $@ ($dst)" if $@; +} + # called in top-level lei-daemon when LeiAuth is done sub net_merge_all_done { - my ($self) = @_; + my ($self, $lei) = @_; + if ($PublicInbox::DS::in_loop && + $self->can("_do_augment_$self->{base_type}") && + !$lei->{opt}->{quiet}) { + $self->{-au_noted} = 0; + PublicInbox::DS::add_timer(1, \&augment_inprogress, + $lei->{2}, $lei->{opt}, + $self->{dst}, \$self->{-au_noted}); + } $self->wq_broadcast('do_post_auth'); $self->wq_close(1); }