}
sub add_message {
- my ($self, $mime, $bytes, $num, $blob) = @_; # mime = Email::MIME object
+ # mime = Email::MIME object
+ my ($self, $mime, $bytes, $num, $oid, $mid0) = @_;
my $doc_id;
my $mids = mids($mime->header_obj);
my $skel = $self->{skeleton};
# populates smsg->references for smsg->to_doc_data
my $refs = parse_references($smsg);
- my $data = $smsg->to_doc_data($blob);
+ $mid0 = $mids->[0] unless defined $mid0;
+ my $data = $smsg->to_doc_data($oid, $mid0);
foreach my $mid (@$mids) {
$tg->index_text($mid, 1, 'XM');
}
$xdb = $txn = undef;
} else {
chomp $line;
- my ($len, $artnum, $object_id) = split(/ /, $line);
+ my ($len, $artnum, $oid, $mid0) = split(/ /, $line);
$xdb ||= $self->_xdb_acquire;
if (!$txn) {
$xdb->begin_transaction;
$n == $len or die "short read: $n != $len\n";
my $mime = PublicInbox::MIME->new(\$msg);
$artnum = int($artnum);
- $self->add_message($mime, $n, $artnum, $object_id);
+ $self->add_message($mime, $n, $artnum, $oid, $mid0);
}
}
warn "$$ still in transaction\n" if $txn;
# called by V2Writable
sub index_raw {
- my ($self, $len, $msgref, $artnum, $object_id) = @_;
+ my ($self, $len, $msgref, $artnum, $object_id, $mid0) = @_;
my $w = $self->{w};
- print $w "$len $artnum $object_id\n", $$msgref or die
+ print $w "$len $artnum $object_id $mid0\n", $$msgref or die
"failed to write partition $!\n";
$w->flush or die "failed to flush: $!\n";
}
sub load_from_data ($$) {
my ($self) = $_[0]; # data = $_[1]
- my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $_[1]);
+ my ($subj, $from, $refs, $to, $cc, $blob, $mid0) = split(/\n/, $_[1]);
$self->{subject} = $subj;
$self->{from} = $from;
$self->{references} = $refs;
$self->{to} = $to;
$self->{cc} = $cc;
$self->{blob} = $blob;
+ $self->{mid} = $mid0;
}
sub load_expand {
}
sub to_doc_data {
- my ($self, $blob) = @_;
+ my ($self, $oid, $mid0) = @_;
my @rows = ($self->subject, $self->from, $self->references,
$self->to, $self->cc);
- push @rows, $blob if defined $blob;
- join("\n", @rows);
+ $oid = '' unless defined $oid;
+ join("\n", @rows, $oid, $mid0);
}
sub references {
# leaking FDs to it...
$self->idx_init;
- my $num = num_for($self, $mime);
+ my $mid0;
+ my $num = num_for($self, $mime, \$mid0);
defined $num or return; # duplicate
+ defined $mid0 or die "BUG: $mid0 undefined\n";
my $im = $self->importer;
my $cmt = $im->add($mime);
$cmt = $im->get_mark($cmt);
my $nparts = $self->{partitions};
my $part = $num % $nparts;
my $idx = $self->idx_part($part);
- $idx->index_raw($len, $msgref, $num, $oid);
+ $idx->index_raw($len, $msgref, $num, $oid, $mid0);
my $n = $self->{transact_bytes} += $len;
if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
$self->checkpoint;
}
sub num_for {
- my ($self, $mime) = @_;
+ my ($self, $mime, $mid0) = @_;
my $mids = mids($mime->header_obj);
if (@$mids) {
my $mid = $mids->[0];
my $num = $self->{skel}->{mm}->mid_insert($mid);
- return $num if defined($num); # common case
+ if (defined $num) { # common case
+ $$mid0 = $mid;
+ return $num;
+ };
# crap, Message-ID is already known, hope somebody just resent:
$self->done; # write barrier, clears $self->{skel}
$num = $self->{skel}->{mm}->mid_insert($m);
if (defined $num) {
warn "alternative <$m> for <$mid> found\n";
+ $$mid0 = $m;
return $num;
}
}
}
# none of the existing Message-IDs are good, generate a new one:
- num_for_harder($self, $mime);
+ num_for_harder($self, $mime, $mid0);
}
sub num_for_harder {
- my ($self, $mime) = @_;
+ my ($self, $mime, $mid0) = @_;
my $hdr = $mime->header_obj;
my $dig = content_digest($mime);
- my $mid = $dig->clone->hexdigest . '@localhost';
- my $num = $self->{skel}->{mm}->mid_insert($mid);
+ $$mid0 = $dig->clone->hexdigest . '@localhost';
+ my $num = $self->{skel}->{mm}->mid_insert($$mid0);
unless (defined $num) {
# it's hard to spoof the last Received: header
my @recvd = $hdr->header_raw('Received');
$dig->add("Received: $_") foreach (@recvd);
- $mid = $dig->clone->hexdigest . '@localhost';
- $num = $self->{skel}->{mm}->mid_insert($mid);
+ $$mid0 = $dig->clone->hexdigest . '@localhost';
+ $num = $self->{skel}->{mm}->mid_insert($$mid0);
# fall back to a random Message-ID and give up determinism:
until (defined($num)) {
$dig->add(rand);
- $mid = $dig->clone->hexdigest . '@localhost';
- warn "using random Message-ID <$mid> as fallback\n";
- $num = $self->{skel}->{mm}->mid_insert($mid);
+ $$mid0 = $dig->clone->hexdigest . '@localhost';
+ warn "using random Message-ID <$$mid0> as fallback\n";
+ $num = $self->{skel}->{mm}->mid_insert($$mid0);
}
}
my @cur = $hdr->header_raw('Message-Id');
- $hdr->header_set('Message-Id', "<$mid>", @cur);
+ $hdr->header_set('Message-Id', "<$$mid0>", @cur);
$num;
}
$mime->header_set('From' => 'bw@g');
$mime->header_set('To' => 'git@vger.kernel.org');
my $bytes = bytes::length($mime->as_string);
- my $doc_id = $rw->add_message($mime, $bytes, ++$num, 'ignored');
my $mid = $mime->header('Message-Id');
+ my $doc_id = $rw->add_message($mime, $bytes, ++$num, 'ignored', $mid);
push @mids, $mid;
ok($doc_id, 'message added: '. $mid);
}
ok($found[1]->{doc_id} > 0, 'doc_id is positive');
}
+SKIP: {
+ use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
+ use Net::NNTP;
+ use IO::Socket;
+ use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
+ eval { require Danga::Socket };
+ skip "Danga::Socket missing $@", 2 if $@;
+ my $err = "$mainrepo/stderr.log";
+ my $out = "$mainrepo/stdout.log";
+ my %opts = (
+ LocalAddr => '127.0.0.1',
+ ReuseAddr => 1,
+ Proto => 'tcp',
+ Type => SOCK_STREAM,
+ Listen => 1024,
+ );
+ my $group = 'inbox.comp.test.v2writable';
+ my $pi_config = "$mainrepo/pi_config";
+ open my $fh, '>', $pi_config or die "open: $!\n";
+ print $fh <<EOF
+[publicinbox "test-v2writable"]
+ mainrepo = $mainrepo
+ version = 2
+ address = test\@example.com
+ newsgroup = $group
+EOF
+ ;
+ close $fh or die "close: $!\n";
+ my $sock = IO::Socket::INET->new(%opts);
+ ok($sock, 'sock created');
+ my $pid;
+ my $len;
+ END { kill 'TERM', $pid if defined $pid };
+ $! = 0;
+ my $fl = fcntl($sock, F_GETFD, 0);
+ ok(! $!, 'no error from fcntl(F_GETFD)');
+ is($fl, FD_CLOEXEC, 'cloexec set by default (Perl behavior)');
+ $pid = fork;
+ if ($pid == 0) {
+ use POSIX qw(dup2);
+ $ENV{PI_CONFIG} = $pi_config;
+ # pretend to be systemd
+ fcntl($sock, F_SETFD, $fl &= ~FD_CLOEXEC);
+ dup2(fileno($sock), 3) or die "dup2 failed: $!\n";
+ $ENV{LISTEN_PID} = $$;
+ $ENV{LISTEN_FDS} = 1;
+ my $nntpd = 'blib/script/public-inbox-nntpd';
+ exec $nntpd, "--stdout=$out", "--stderr=$err";
+ die "FAIL: $!\n";
+ }
+ ok(defined $pid, 'forked nntpd process successfully');
+ $! = 0;
+ fcntl($sock, F_SETFD, $fl |= FD_CLOEXEC);
+ ok(! $!, 'no error from fcntl(F_SETFD)');
+ my $host_port = $sock->sockhost . ':' . $sock->sockport;
+ my $n = Net::NNTP->new($host_port);
+ $n->group($group);
+ my $x = $n->xover('1-');
+ my %uniq;
+ foreach my $num (sort { $a <=> $b } keys %$x) {
+ my $mid = $x->{$num}->[3];
+ is($uniq{$mid}++, 0, "MID for $num is unique in XOVER");
+ is_deeply($n->xhdr('Message-ID', $num),
+ { $num => $mid }, "XHDR lookup OK on num $num");
+ is_deeply($n->xhdr('Message-ID', $mid),
+ { $mid => $mid }, "XHDR lookup OK on MID $num");
+ }
+ my %nn;
+ foreach my $mid (@{$n->newnews(0, $group)}) {
+ is($nn{$mid}++, 0, "MID is unique in NEWNEWS");
+ }
+ is_deeply([sort keys %nn], [sort keys %uniq]);
+};
done_testing();