delete($skv->{dbh}) if $skv;
}
+sub dedupe_nr {
+ my $skv = $_[0]->[0] or return undef;
+ my @n = $skv->count;
+ $n[0];
+}
+
1;
EOM
}
+# cf. LeiDedupe->dedupe_nr
+sub dedupe_nr {
+ my $oidx = $_[0]->{oidx} // die 'BUG: no {oidx}';
+ my @n = $oidx->{dbh}->selectrow_array('SELECT COUNT(*) FROM over');
+ $n[0];
+}
+
no warnings 'once';
*nntp_url = \&cloneurl;
*base_url = \&PublicInbox::Inbox::base_url;
warn "# keyword `$k' not supported for mbox\n";
}
}
- # Messages are always 'O' (non-\Recent in IMAP), it saves
- # MUAs the trouble of rewriting the mbox if no other
- # changes are made. We put 'O' at the end (e.g. "Status: RO")
- # to match mutt(1) output.
- $eml->header_set('Status', join('', sort(@{$hdr{Status}})). 'O');
+ # When writing to empty mboxes, messages are always 'O'
+ # (not-\Recent in IMAP), it saves MUAs the trouble of
+ # rewriting the mbox if no other changes are made.
+ # We put 'O' at the end (e.g. "Status: RO") to match mutt(1) output.
+ # We only set smsg->{-recent} if augmenting existing stores.
+ my $status = join('', sort(@{$hdr{Status}}));
+ $status .= 'O' unless $smsg->{-recent};
+ $eml->header_set('Status', $status) if $status;
if (my $chars = delete $hdr{'X-Status'}) {
$eml->header_set('X-Status', join('', sort(@$chars)));
}
my $dedupe = $lei->{dedupe};
$dedupe->prepare_dedupe;
my $lse = $lei->{lse}; # may be undef
+ my $set_recent = $dedupe->dedupe_nr;
sub { # for git_to_mail
my ($buf, $smsg, $eml) = @_;
$eml //= PublicInbox::Eml->new($buf);
return if $dedupe->is_dup($eml, $smsg);
$lse->xsmsg_vmd($smsg) if $lse;
+ $smsg->{-recent} = 1 if $set_recent;
$buf = $eml2mbox->($eml, $smsg);
if ($atomic_append) {
atomic_append($lei, $buf);
join('', sort(map { $kw2char{$_} // () } @$kw, @_));
}
-sub _buf2maildir {
- my ($dst, $buf, $smsg) = @_;
+sub _buf2maildir ($$$$) {
+ my ($dst, $buf, $smsg, $dir) = @_;
my $kw = $smsg->{kw} // [];
my $rand = ''; # chosen by die roll :P
my ($tmp, $fh, $base, $ok);
} while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) &&
$!{EEXIST} && ($rand = _rand.','));
if ($ok && print $fh $$buf and close($fh)) {
- # ignore new/ and write only to cur/, otherwise MUAs
- # with R/W access to the Maildir will end up doing
- # a mass rename which can take a while with thousands
- # of messages.
- $dst .= 'cur/';
+ $dst .= $dir; # 'new/' or 'cur/'
$rand = '';
do {
$base = $rand.$common.':2,'.kw2suffix($kw);
my $lse = $lei->{lse}; # may be undef
my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef;
my $out = $sto ? 'maildir:'.$lei->rel2abs($dst) : undef;
+
+ # Favor cur/ and only write to new/ when augmenting. This
+ # saves MUAs from having to do a mass rename when the initial
+ # search result set is huge.
+ my $dir = $dedupe && $dedupe->dedupe_nr ? 'new/' : 'cur/';
sub { # for git_to_mail
my ($bref, $smsg, $eml) = @_;
$dst // return $lei->fail; # dst may be undef-ed in last run
PublicInbox::Eml->new($$bref),
$smsg);
$lse->xsmsg_vmd($smsg) if $lse;
- my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg);
+ my $n = _buf2maildir($dst, $bref // \($eml->as_string),
+ $smsg, $dir);
$sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
++$lei->{-nr_write};
}
$lei->{1} = $zpipe->[1];
close $zpipe->[0];
}
+ my $au_peers = delete $self->{au_peers};
+ if ($au_peers) { # wait for peer l2m to finish augmenting:
+ $au_peers->[1] = undef;
+ sysread($au_peers->[0], my $barrier1, 1);
+ }
$self->{wcb} = $self->write_cb($lei);
+ if ($au_peers) { # wait for peer l2m to set write_cb
+ $au_peers->[3] = undef;
+ sysread($au_peers->[2], my $barrier2, 1);
+ }
}
sub ipc_atfork_child {
if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
$lei->start_mua;
}
+ my $F_SETPIPE_SZ = $^O eq 'linux' ? 1031 : undef;
+ if ($l2m->{-wq_nr_workers} > 1 &&
+ $l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) {
+ # setup two barriers to coordinate dedupe_nr
+ # between l2m workers
+ pipe(my ($a_r, $a_w)) or die "pipe: $!";
+ fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+ pipe(my ($b_r, $b_w)) or die "pipe: $!";
+ fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+ $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
+ }
$l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
- # 1031: F_SETPIPE_SZ
- fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+ fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+ delete $l2m->{au_peers};
}
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
'<testmessage@example.com>' => eml_load('t/utf8.eml'),
};
$exp->{'<qp@example.com>'}->header_set('Status', 'RO');
-$exp->{'<testmessage@example.com>'}->header_set('Status', 'O');
test_lei(sub {
lei_ok(qw(import -F eml t/plack-qp.eml));
my %res;
PublicInbox::MboxReader->mboxrd($fh, sub {
my ($eml) = @_;
- $res{$eml->header_raw('Message-ID')} = $eml;
+ my $mid = $eml->header_raw('Message-ID');
+ if ($mid eq '<testmessage@example.com>') {
+ is_deeply([$eml->header('Status')], [],
+ "no status $sfx");
+ $eml->header_set('Status');
+ } elsif ($mid eq '<qp@example.com>') {
+ is($eml->header('Status'), 'RO', 'status preserved');
+ } else {
+ fail("unknown mid $mid");
+ }
+ $res{$mid} = $eml;
});
is_deeply(\%res, $exp, '--augment worked');
lei_ok qw(up -q md -C), $home;
lei_ok qw(up -q . -C), "$home/md";
lei_ok qw(up -q), "/$home/md";
- my %after = map { $_ => 1 } glob("$home/md/cur/*");
+ my %after = map { $_ => 1 } glob("$home/md/{new,cur}/*");
is(delete $after{(keys(%before))[0]}, 1, 'original message kept');
is(scalar(keys %after), 1, 'one new message added');
is_deeply(eml_load((keys %after)[0]), $doc2, 'doc2 matches');
$im->add(PublicInbox::Eml->new($diff));
$im->done;
lei_ok('up', $o);
- @m = glob("$o/cur/*");
+ @m = glob("$o/{new,cur}/*");
is(scalar(@m), 2, 'got 2nd result due to different OID');
SKIP: {
my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter
my $wcb_get = sub {
my ($fmt, $dst) = @_;
- delete $lei->{dedupe};
+ delete $lei->{dedupe}; # to be recreated
$lei->{ovv} = bless {
fmt => $fmt,
dst => $dst
like($raw, qr/^blah\n/sm, 'wrote content');
unlink $fn or BAIL_OUT $!;
- local $lei->{opt} = { jobs => 2 };
$wcb = $wcb_get->($mbox, $fn);
ok(-f $fn && !-s _, 'truncated mbox destination');
$wcb->(\($dup = $buf), $deadbeef);
$commit->($wcb);
open $fh, '<', $fn or BAIL_OUT $!;
- is(do { local $/; <$fh> }, $raw, 'jobs > 1');
+ is(do { local $/; <$fh> }, $raw, 'wrote identical content');
$raw;
};
ok($dc_cmd, "decompressor for .$zsfx");
my $f = "$fn.$zsfx";
my $wcb = $wcb_get->($mbox, $f);
- $wcb->(\(my $dup = $buf), $deadbeef);
+ $wcb->(\(my $dup = $buf), { %$deadbeef });
$commit->($wcb);
my $uncompressed = xqx([@$dc_cmd, $f]);
is($uncompressed, $orig, "$zsfx works unlocked");
- local $lei->{opt} = { jobs => 2 }; # for atomic writes
unlink $f or BAIL_OUT "unlink $!";
$wcb = $wcb_get->($mbox, $f);
- $wcb->(\($dup = $buf), $deadbeef);
+ $wcb->(\($dup = $buf), { %$deadbeef });
$commit->($wcb);
is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
local $lei->{opt} = { augment => 1 };
$wcb = $wcb_get->($mbox, $f);
- $wcb->(\($dup = $buf . "\nx\n"), $deadbeef);
+ $wcb->(\($dup = $buf . "\nx\n"), { %$deadbeef });
$commit->($wcb);
my $cat = popen_rd([@$dc_cmd, $f]);
like($raw[1], qr/\nblah\n\nx\n\z/s, "augmented $zsfx");
like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx");
- local $lei->{opt} = { augment => 1, jobs => 2 };
+ local $lei->{opt} = { augment => 1 };
$wcb = $wcb_get->($mbox, $f);
- $wcb->(\($dup = $buf . "\ny\n"), $deadbeef);
+ $wcb->(\($dup = $buf . "\ny\n"), { %$deadbeef });
$commit->($wcb);
my @raw3;