- while (my $line = readline($r)) {
- chomp $line;
- $v2w->{current_info} = "[$shard] $line";
- if ($line eq 'commit') {
- $self->commit_txn_lazy;
- } elsif ($line eq 'close') {
- $self->idx_release;
- } elsif ($line eq 'barrier') {
- $self->commit_txn_lazy;
- # no need to lock < 512 bytes is atomic under POSIX
- print $bnote "barrier $shard\n" or
- die "write failed for barrier $!\n";
- } elsif ($line =~ /\AD ([0-9]+)\z/s) {
- $self->remove_by_docid($1 + 0);
- } elsif ($line =~ s/\A\+X //) {
- my ($len, $docid, $eidx_key) = split(/ /, $line, 3);
- $self->add_eidx_info($docid, $eidx_key, eml($r, $len));
- } elsif ($line =~ s/\A-X //) {
- my ($len, $docid, $eidx_key) = split(/ /, $line, 3);
- $self->remove_eidx_info($docid, $eidx_key,
- eml($r, $len));
- } elsif ($line =~ s/\A=K (\d+) //) {
- $self->set_keywords($1 + 0, split(/ /, $line));
- } elsif ($line =~ s/\A-K (\d+) //) {
- $self->remove_keywords($1 + 0, split(/ /, $line));
- } elsif ($line =~ s/\A\+K (\d+) //) {
- $self->add_keywords($1 + 0, split(/ /, $line));
- } elsif ($line =~ s/\AO ([^\n]+)//) {
- my $over_fn = $1;
- $over_fn =~ tr/\0/\n/;
- $self->over_check(PublicInbox::Over->new($over_fn));
- } else {
- my $eidx_key;
- if ($line =~ s/\AX=(.+)\0//) {
- $eidx_key = $1;
- $v2w->{current_info} =~ s/\0/\\0 /;
- }
- # n.b. $mid may contain spaces(!)
- my ($len, $bytes, $num, $oid, $ds, $ts, $tid, $mid)
- = split(/ /, $line, 8);
- $self->begin_txn_lazy;
- my $smsg = bless {
- bytes => $bytes,
- num => $num + 0,
- blob => $oid,
- mid => $mid,
- tid => $tid,
- ds => $ds,
- ts => $ts,
- }, 'PublicInbox::Smsg';
- $smsg->{eidx_key} = $eidx_key if defined($eidx_key);
- $self->add_message(eml($r, $len), $smsg);
- }
- }
- $self->worker_done;