use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
use Errno qw(EEXIST ESPIPE ENOENT EPIPE);
+use Digest::SHA qw(sha256_hex);
my ($maildir_each_file);
# struggles with short-lived repos, Gcf2Client makes little sense with lei;
}
sub _augment_file { # maildir_each_file cb
- my ($f, $lei) = @_;
+ my ($f, $lei, $mod, $shard) = @_;
+ if ($mod) {
+ # can't get dirent.d_ino w/ pure Perl, so we extract the OID
+ # if it looks like one:
+ my $hex = $f =~ m!\b([a-f0-9]{40,})[^/]*\z! ?
+ $1 : sha256_hex($f);
+ my $recno = hex(substr($hex, 0, 8));
+ return if ($recno % $mod) != $shard;
+ }
my $eml = PublicInbox::InboxWritable::eml_from_path($f) or return;
_augment($eml, $lei);
}
if ($lei->{opt}->{augment}) {
my $dedupe = $lei->{dedupe};
if ($dedupe && $dedupe->prepare_dedupe) {
- $maildir_each_file->($dst, \&_augment_file, $lei);
+ my ($mod, $shard) = @{$self->{shard_info} // []};
+ $maildir_each_file->($dst, \&_augment_file,
+ $lei, $mod, $shard);
$dedupe->pause_dedupe;
}
} else { # clobber existing Maildir
my ($self) = @_;
my $lei = delete $self->{lei};
$lei->lei_atfork_child;
- if ($self->{-wq_worker_nr} == 0) {
+ my $aug;
+ if (lock_free($self)) {
+ my $mod = $self->{-wq_nr_workers};
+ my $shard = $self->{-wq_worker_nr};
+ if (my $nwr = $lei->{nwr}) {
+ $nwr->{shard_info} = [ $mod, $shard ];
+ } else { # Maildir (MH?)
+ $self->{shard_info} = [ $mod, $shard ];
+ }
+ $aug = '+'; # incr_post_augment
+ } elsif ($self->{-wq_worker_nr} == 0) {
+ $aug = '.'; # do_post_augment
+ }
+ if ($aug) {
local $0 = 'do_augment';
eval { do_augment($self, $lei) };
$lei->fail($@) if $@;
- pkt_do($lei->{pkt_op_p}, '.') == 1 or
+ pkt_do($lei->{pkt_op_p}, $aug) == 1 or
die "do_post_augment trigger: $!";
}
if (my $zpipe = delete $lei->{zpipe}) {
sub do_post_augment {
my ($lei) = @_;
- my $l2m = $lei->{l2m};
+ my $l2m = $lei->{l2m} or die 'BUG: unexpected do_post_augment';
my $err;
- if ($l2m) {
- eval { $l2m->post_augment($lei) };
- $err = $@;
- if ($err) {
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_kill;
- $lxs->wq_close(0, undef, $lei);
- }
- $lei->fail("$err");
+ eval { $l2m->post_augment($lei) };
+ $err = $@;
+ if ($err) {
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_kill;
+ $lxs->wq_close(0, undef, $lei);
}
+ $lei->fail("$err");
}
if (!$err && delete $lei->{early_mua}) { # non-augment case
$lei->start_mua;
close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
}
+sub incr_post_augment { # called whenever an l2m shard finishes
+ my ($lei) = @_;
+ my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
+ return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
+ do_post_augment($lei);
+}
+
my $MAX_PER_HOST = 4;
sub concurrency {
'|' => [ $lei->can('sigpipe_handler'), $lei ],
'!' => [ $lei->can('fail_handler'), $lei ],
'.' => [ \&do_post_augment, $lei ],
+ '+' => [ \&incr_post_augment, $lei ],
'' => [ \&query_done, $lei ],
'mset_progress' => [ \&mset_progress, $lei ],
'x_it' => [ $lei->can('x_it'), $lei ],
}
return if $l_uid >= $r_uid; # nothing to do
$l_uid ||= 1;
-
- warn "# $uri fetching UID $l_uid:$r_uid\n" unless $self->{quiet};
+ my ($mod, $shard) = @{$self->{shard_info} // []};
+ unless ($self->{quiet}) {
+ my $m = $mod ? " [(UID % $mod) == $shard]" : '';
+ warn "# $uri fetching UID $l_uid:$r_uid$m\n";
+ }
$mic->Uid(1); # the default, we hope
my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
$l_uid = $uids->[-1] + 1; # for next search
my $last_uid;
my $n = $self->{max_batch};
+
+ @$uids = grep { ($_ % $mod) == $shard } @$uids if $mod;
while (scalar @$uids) {
my @batch = splice(@$uids, 0, $bs);
$batch = join(',', @batch);