- while (my $line = $r->getline) {
- $v2w->{current_info} = "[$shard] $line";
- if ($line eq "commit\n") {
- $self->commit_txn_lazy;
- } elsif ($line eq "close\n") {
- $self->_xdb_release;
- } elsif ($line eq "barrier\n") {
- $self->commit_txn_lazy;
- # no need to lock < 512 bytes is atomic under POSIX
- print $bnote "barrier $shard\n" or
- die "write failed for barrier $!\n";
- } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
- my ($oid, $mid) = ($1, $2);
- $self->begin_txn_lazy;
- $self->remove_by_oid($oid, $mid);
- } else {
- chomp $line;
- # n.b. $mid may contain spaces(!)
- my ($bytes, $num, $blob, $ds, $ts, $mid) =
- split(/ /, $line, 6);
- $self->begin_txn_lazy;
- my $n = read($r, my $msg, $bytes) or die "read: $!\n";
- $n == $bytes or die "short read: $n != $bytes\n";
- my $mime = PublicInbox::Eml->new(\$msg);
- my $smsg = bless {
- bytes => $bytes,
- num => $num + 0,
- blob => $blob,
- mid => $mid,
- ds => $ds,
- ts => $ts,
- }, 'PublicInbox::Smsg';
- $self->add_message($mime, $smsg);
- }
- }
- $self->worker_done;