+sub mitem_kw ($$;$) {
+ my ($smsg, $mitem, $flagged) = @_;
+ my $kw = xap_terms('K', $mitem->get_document);
+ $kw->{flagged} = 1 if $flagged;
+ $smsg->{kw} = [ sort keys %$kw ];
+}
+
my $num = int(($docid - 1) / $nshard) + 1;
my $ibx = $self->{shard2ibx}->[$shard];
my $smsg = $ibx->over->get_art($num);
my $num = int(($docid - 1) / $nshard) + 1;
my $ibx = $self->{shard2ibx}->[$shard];
my $smsg = $ibx->over->get_art($num);
- if (ref($ibx->can('msg_keywords'))) {
- my $kw = xap_terms('K', $mitem->get_document);
- $smsg->{kw} = [ sort keys %$kw ];
- }
+ mitem_kw($smsg, $mitem) if $ibx->can('msg_keywords');
$lei->{opt}->{quiet} = 1;
delete $lei->{opt}->{verbose};
delete $lei->{-progress};
} else {
$lei->{opt}->{quiet} = 1;
delete $lei->{opt}->{verbose};
delete $lei->{-progress};
} else {
my $mo = { %{$lei->{mset_opt}} };
my $mset;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
my $mo = { %{$lei->{mset_opt}} };
my $mset;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
do {
$mset = $srch->mset($mo->{qstr}, $mo);
mset_progress($lei, $desc, $mset->size,
do {
$mset = $srch->mset($mo->{qstr}, $mo);
mset_progress($lei, $desc, $mset->size,
my ($eml, $self, $lei, $each_smsg) = @_;
my ($eml, $self, $lei, $each_smsg) = @_;
my $smsg = bless {}, 'PublicInbox::Smsg';
$smsg->populate($eml);
$smsg->parse_references($eml, mids($eml));
my $smsg = bless {}, 'PublicInbox::Smsg';
$smsg->populate($eml);
$smsg->parse_references($eml, mids($eml));
local $0 = "$0 query_remote_mboxrd";
local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
my $lei = $self->{lei};
local $0 = "$0 query_remote_mboxrd";
local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
my $lei = $self->{lei};
my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
push(@qform, t => 1) if $opt->{threads};
my $verbose = $opt->{verbose};
my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
push(@qform, t => 1) if $opt->{threads};
my $verbose = $opt->{verbose};
$uri->query_form(@qform);
my $cmd = $curl->for_uri($lei, $uri);
$lei->qerr("# $cmd");
$uri->query_form(@qform);
my $cmd = $curl->for_uri($lei, $uri);
$lei->qerr("# $cmd");
- my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
+ my ($fh, $pid) = popen_rd($cmd, undef, $rdr);
$reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
$fh = IO::Uncompress::Gunzip->new($fh);
$reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
$fh = IO::Uncompress::Gunzip->new($fh);
- PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
+ PublicInbox::MboxReader->mboxrd($fh, \&each_remote_eml, $self,
$lei, $each_smsg);
my $err = waitpid($pid, 0) == $pid ? undef
: "BUG: waitpid($cmd): $!";
@$reap_curl = (); # cancel OnDestroy
die $err if $err;
$lei, $each_smsg);
my $err = waitpid($pid, 0) == $pid ? undef
: "BUG: waitpid($cmd): $!";
@$reap_curl = (); # cancel OnDestroy
die $err if $err;
mset_progress($lei, $lei->{-current_url}, $nr, $nr);
next;
}
mset_progress($lei, $lei->{-current_url}, $nr, $nr);
next;
}
- if ($l2m) {
- eval { $l2m->post_augment($lei) };
- $err = $@;
- if ($err) {
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_kill;
- $lxs->wq_close(0, undef, $lei);
- }
- $lei->fail("$err");
+ eval { $l2m->post_augment($lei) };
+ $err = $@;
+ if ($err) {
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_kill;
+ $lxs->wq_close(0, undef, $lei);
for my $ibxish (locals($self)) {
$self->wq_io_do('query_thread_mset', [], $ibxish);
}
for my $ibxish (locals($self)) {
$self->wq_io_do('query_thread_mset', [], $ibxish);
}
-sub query_prepare { # called by wq_io_do
- my ($self) = @_;
- local $0 = "$0 query_prepare";
- my $lei = $self->{lei};
- eval { $lei->{l2m}->do_augment($lei) };
- $lei->fail($@) if $@;
- pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
-}
-
my $ops = {
'|' => [ $lei->can('sigpipe_handler'), $lei ],
'!' => [ $lei->can('fail_handler'), $lei ],
'.' => [ \&do_post_augment, $lei ],
my $ops = {
'|' => [ $lei->can('sigpipe_handler'), $lei ],
'!' => [ $lei->can('fail_handler'), $lei ],
'.' => [ \&do_post_augment, $lei ],
'' => [ \&query_done, $lei ],
'mset_progress' => [ \&mset_progress, $lei ],
'x_it' => [ $lei->can('x_it'), $lei ],
'child_error' => [ $lei->can('child_error'), $lei ],
'' => [ \&query_done, $lei ],
'mset_progress' => [ \&mset_progress, $lei ],
'x_it' => [ $lei->can('x_it'), $lei ],
'child_error' => [ $lei->can('child_error'), $lei ],
($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
$lei->{1}->autoflush(1);
$lei->start_pager if delete $lei->{need_pager};
$lei->{ovv}->ovv_begin($lei);
($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
$lei->{1}->autoflush(1);
$lei->start_pager if delete $lei->{need_pager};
$lei->{ovv}->ovv_begin($lei);
$lei->oldset, { lei => $lei });
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
# 1031: F_SETPIPE_SZ
$lei->oldset, { lei => $lei });
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
# 1031: F_SETPIPE_SZ
# delete until all lei2mail + lei_xsearch workers are reaped
$lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
}
# delete until all lei2mail + lei_xsearch workers are reaped
$lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
}
$lei->oldset, { lei => $lei });
my $op = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
$lei->oldset, { lei => $lei });
my $op = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};