]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei2mail: parallel augment for lock-free stores
authorEric Wong <e@80x24.org>
Sun, 21 Feb 2021 07:41:34 +0000 (07:41 +0000)
committerEric Wong <e@80x24.org>
Sun, 21 Feb 2021 08:59:33 +0000 (08:59 +0000)
This lets us make use of multiple cores on IMAP and Maildir
backed by SSD (or better) storage.  This benefits IMAP stores
with high network latency, but may still penalize IMAP servers
with rotational storage.

lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/NetReader.pm

index b5d560c7b735a80142536b6e003acdd227005e0c..6efd398a76652c880637b17d0fe0194f5b1fa06a 100644 (file)
@@ -19,6 +19,7 @@ use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT EPIPE);
+use Digest::SHA qw(sha256_hex);
 my ($maildir_each_file);
 
 # struggles with short-lived repos, Gcf2Client makes little sense with lei;
@@ -269,7 +270,15 @@ sub _mbox_write_cb ($$) {
 }
 
 sub _augment_file { # maildir_each_file cb
-       my ($f, $lei) = @_;
+       my ($f, $lei, $mod, $shard) = @_;
+       if ($mod) {
+               # can't get dirent.d_ino w/ pure Perl, so we extract the OID
+               # if it looks like one:
+               my $hex = $f =~ m!\b([a-f0-9]{40,})[^/]*\z! ?
+                               $1 : sha256_hex($f);
+               my $recno = hex(substr($hex, 0, 8));
+               return if ($recno % $mod) != $shard;
+       }
        my $eml = PublicInbox::InboxWritable::eml_from_path($f) or return;
        _augment($eml, $lei);
 }
@@ -421,7 +430,9 @@ sub _do_augment_maildir {
        if ($lei->{opt}->{augment}) {
                my $dedupe = $lei->{dedupe};
                if ($dedupe && $dedupe->prepare_dedupe) {
-                       $maildir_each_file->($dst, \&_augment_file, $lei);
+                       my ($mod, $shard) = @{$self->{shard_info} // []};
+                       $maildir_each_file->($dst, \&_augment_file,
+                                               $lei, $mod, $shard);
                        $dedupe->pause_dedupe;
                }
        } else { # clobber existing Maildir
@@ -516,11 +527,24 @@ sub ipc_atfork_child {
        my ($self) = @_;
        my $lei = delete $self->{lei};
        $lei->lei_atfork_child;
-       if ($self->{-wq_worker_nr} == 0) {
+       my $aug;
+       if (lock_free($self)) {
+               my $mod = $self->{-wq_nr_workers};
+               my $shard = $self->{-wq_worker_nr};
+               if (my $nwr = $lei->{nwr}) {
+                       $nwr->{shard_info} = [ $mod, $shard ];
+               } else { # Maildir (MH?)
+                       $self->{shard_info} = [ $mod, $shard ];
+               }
+               $aug = '+'; # incr_post_augment
+       } elsif ($self->{-wq_worker_nr} == 0) {
+               $aug = '.'; # do_post_augment
+       }
+       if ($aug) {
                local $0 = 'do_augment';
                eval { do_augment($self, $lei) };
                $lei->fail($@) if $@;
-               pkt_do($lei->{pkt_op_p}, '.') == 1 or
+               pkt_do($lei->{pkt_op_p}, $aug) == 1 or
                                        die "do_post_augment trigger: $!";
        }
        if (my $zpipe = delete $lei->{zpipe}) {
index 524f4d1ceb157328726f0a4ce7d759befe7d2fa2..e982165f8014395e0b37fbab5a78eaee3eb45eba 100644 (file)
@@ -331,18 +331,16 @@ Error closing $lei->{ovv}->{dst}: $!
 
 sub do_post_augment {
        my ($lei) = @_;
-       my $l2m = $lei->{l2m};
+       my $l2m = $lei->{l2m} or die 'BUG: unexpected do_post_augment';
        my $err;
-       if ($l2m) {
-               eval { $l2m->post_augment($lei) };
-               $err = $@;
-               if ($err) {
-                       if (my $lxs = delete $lei->{lxs}) {
-                               $lxs->wq_kill;
-                               $lxs->wq_close(0, undef, $lei);
-                       }
-                       $lei->fail("$err");
+       eval { $l2m->post_augment($lei) };
+       $err = $@;
+       if ($err) {
+               if (my $lxs = delete $lei->{lxs}) {
+                       $lxs->wq_kill;
+                       $lxs->wq_close(0, undef, $lei);
                }
+               $lei->fail("$err");
        }
        if (!$err && delete $lei->{early_mua}) { # non-augment case
                $lei->start_mua;
@@ -350,6 +348,13 @@ sub do_post_augment {
        close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
+sub incr_post_augment { # called whenever an l2m shard finishes
+       my ($lei) = @_;
+       my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
+       return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
+       do_post_augment($lei);
+}
+
 my $MAX_PER_HOST = 4;
 
 sub concurrency {
@@ -392,6 +397,7 @@ sub do_query {
                '|' => [ $lei->can('sigpipe_handler'), $lei ],
                '!' => [ $lei->can('fail_handler'), $lei ],
                '.' => [ \&do_post_augment, $lei ],
+               '+' => [ \&incr_post_augment, $lei ],
                '' => [ \&query_done, $lei ],
                'mset_progress' => [ \&mset_progress, $lei ],
                'x_it' => [ $lei->can('x_it'), $lei ],
index 4c4124913ec64b3c488bf385841950d3571e0e82..0956d5dad6517509194e093fcc4db9efd74e9f64 100644 (file)
@@ -363,8 +363,11 @@ sub _imap_fetch_all ($$$) {
        }
        return if $l_uid >= $r_uid; # nothing to do
        $l_uid ||= 1;
-
-       warn "# $uri fetching UID $l_uid:$r_uid\n" unless $self->{quiet};
+       my ($mod, $shard) = @{$self->{shard_info} // []};
+       unless ($self->{quiet}) {
+               my $m = $mod ? " [(UID % $mod) == $shard]" : '';
+               warn "# $uri fetching UID $l_uid:$r_uid$m\n";
+       }
        $mic->Uid(1); # the default, we hope
        my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
        my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
@@ -391,6 +394,8 @@ sub _imap_fetch_all ($$$) {
                $l_uid = $uids->[-1] + 1; # for next search
                my $last_uid;
                my $n = $self->{max_batch};
+
+               @$uids = grep { ($_ % $mod) == $shard } @$uids if $mod;
                while (scalar @$uids) {
                        my @batch = splice(@$uids, 0, $bs);
                        $batch = join(',', @batch);