- while (my $line = $r->getline) {
- $v2w->{current_info} = "[$shard] $line";
- if ($line eq "commit\n") {
- $self->commit_txn_lazy;
- } elsif ($line eq "close\n") {
- $self->_xdb_release;
- } elsif ($line eq "barrier\n") {
- $self->commit_txn_lazy;
- # no need to lock < 512 bytes is atomic under POSIX
- print $bnote "barrier $shard\n" or
- die "write failed for barrier $!\n";
- } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
- my ($oid, $mid) = ($1, $2);
- $self->begin_txn_lazy;
- $self->remove_by_oid($oid, $mid);
- } else {
- chomp $line;
- my ($len, $artnum, $oid, $mid0) = split(/ /, $line);
- $self->begin_txn_lazy;
- my $n = read($r, my $msg, $len) or die "read: $!\n";
- $n == $len or die "short read: $n != $len\n";
- my $mime = PublicInbox::MIME->new(\$msg);
- $artnum = int($artnum);
- $self->add_message($mime, $n, $artnum, $oid, $mid0);
- }
- }
- $self->worker_done;