use PublicInbox::Eml;
sub new {
- my ($class, $v2w, $shard) = @_;
+ my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx
my $ibx = $v2w->{ibx};
- my $self = $class->SUPER::new($ibx, 1, $shard);
+ my $self = $ibx ? $class->SUPER::new($ibx, 1, $shard)
+ : $class->eidx_shard_new($v2w, $shard);
# create the DB before forking:
$self->idx_acquire;
$self->set_metadata_once;
# this reads all the writes to $self->{w} from the parent process
sub shard_worker_loop ($$$$$) {
my ($self, $v2w, $r, $shard, $bnote) = @_;
- $0 = "pi-v2-shard[$shard]";
+ $0 = "shard[$shard]";
$self->begin_txn_lazy;
while (my $line = readline($r)) {
$v2w->{current_info} = "[$shard] $line";
} elsif ($line =~ /\AD ([a-f0-9]{40,}) ([0-9]+)\n\z/s) {
$self->remove_by_oid($1, $2 + 0);
} elsif ($line =~ s/\A\+X //) {
- my ($len, $docid, $xnum, $oid, $ng_or_dir) =
+ my ($len, $docid, $xnum, $oid, $eidx_key) =
split(/ /, $line, 5);
- $self->add_xref3($docid, $xnum, $oid, $ng_or_dir,
+ $self->add_xref3($docid, $xnum, $oid, $eidx_key,
eml($r, $len));
} elsif ($line =~ s/\A-X //) {
- my ($len, $docid, $xnum, $oid, $ng_or_dir) =
+ my ($len, $docid, $xnum, $oid, $eidx_key) =
split(/ /, $line, 5);
$self->remove_xref3($docid, $xnum, $oid,
- $ng_or_dir, eml($r, $len));
+ $eidx_key, eml($r, $len));
} else {
chomp $line;
+ my $eidx_key;
+ if ($line =~ s/\AX(.+)\0//) {
+ $eidx_key = $1;
+ }
# n.b. $mid may contain spaces(!)
my ($len, $bytes, $num, $oid, $ds, $ts, $tid, $mid)
= split(/ /, $line, 8);
ds => $ds,
ts => $ts,
}, 'PublicInbox::Smsg';
+ $smsg->{eidx_key} = $eidx_key if defined($eidx_key);
$self->add_message(eml($r, $len), $smsg);
}
}
}
sub index_raw {
- my ($self, $msgref, $eml, $smsg) = @_;
+ my ($self, $msgref, $eml, $smsg, $ibx) = @_;
if (my $w = $self->{w}) {
+ if ($ibx) {
+ print $w 'X', $ibx->eidx_key, "\0" or die
+ "failed to write shard: $!\n";
+ }
+ $msgref //= \($eml->as_string);
+ $smsg->{raw_bytes} //= length($$msgref);
# mid must be last, it can contain spaces (but not LF)
print $w join(' ', @$smsg{qw(raw_bytes bytes
num blob ds ts tid mid)}),
"\n", $$msgref or die "failed to write shard $!\n";
} else {
if ($eml) {
- undef $$msgref;
+ undef($$msgref) if $msgref;
} else { # --xapian-only + --sequential-shard:
$eml = PublicInbox::Eml->new($msgref);
}
$self->begin_txn_lazy;
+ $smsg->{eidx_key} = $ibx->eidx_key if $ibx;
$self->add_message($eml, $smsg);
}
}
sub shard_add_xref3 {
my ($self, $docid, $xnum, $oid, $xibx, $eml) = @_;
- my $ng_or_dir = $xibx->{newsgroup} // $xibx->{inboxdir};
+ my $eidx_key = $xibx->eidx_key;
if (my $w = $self->{w}) {
my $hdr = $eml->header_obj->as_string;
my $len = length($hdr);
- print $w "+X $len $docid $xnum $oid $ng_or_dir\n", $hdr or
+ print $w "+X $len $docid $xnum $oid $eidx_key\n", $hdr or
die "failed to write shard: $!";
} else {
- $self->add_xref3($docid, $xnum, $oid, $ng_or_dir, $eml);
+ $self->add_xref3($docid, $xnum, $oid, $eidx_key, $eml);
}
}
sub shard_remove_xref3 {
my ($self, $docid, $oid, $xibx, $eml) = @_;
- my $ng_or_dir = $xibx->{newsgroup} // $xibx->{inboxdir};
+ my $eidx_key = $xibx->eidx_key;
if (my $w = $self->{w}) {
my $hdr = $eml->header_obj->as_string;
my $len = length($hdr);
- print $w "-X $len $docid $oid $ng_or_dir\n", $hdr or
+ print $w "-X $len $docid $oid $eidx_key\n", $hdr or
die "failed to write shard: $!";
} else {
- $self->remove_xref3($docid, $oid, $ng_or_dir, $eml);
+ $self->remove_xref3($docid, $oid, $eidx_key, $eml);
}
}