]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei q: parallelize Maildir and mbox writing
authorEric Wong <e@80x24.org>
Mon, 18 Jan 2021 10:30:31 +0000 (04:30 -0600)
committerEric Wong <e@80x24.org>
Mon, 18 Jan 2021 21:20:25 +0000 (21:20 +0000)
With 4 dedicated workers, this seems to provide a 100-120%
speedup on a 4 core machine when writing thousands of search
results to a Maildir or mbox.  This also sets us up for
high-latency IMAP destinations in the future.

This opens the door to more speedup opportunities such
as optimizing dedupe locking and other ways to reduce
contention.

This change is fairly complex and convoluted, unfortunately.
Further work may allow us to simplify it and even improve
performance.

lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Spawn.pm

index 78cb8400d23758416c5859cdc468cef0f11144b0..8fec2e62a4373f4b81507403ba64245e623808b5 100644 (file)
@@ -397,4 +397,7 @@ sub DESTROY {
        ipc_worker_stop($self);
 }
 
+# Sereal doesn't have dclone
+sub deep_clone { thaw(freeze($_[-1])) }
+
 1;
index 61f2a65b50d020286e3758e915dea58dc245b958..6b6ee0f5e8edb8256c08a6674e196fab8c1690c5 100644 (file)
@@ -279,13 +279,21 @@ sub atfork_prepare_wq {
        if (my $sock = $self->{sock}) {
                push @$tcafc, @$self{qw(0 1 2)}, $sock;
        }
+       for my $f (qw(lxs l2m)) {
+               my $ipc = $self->{$f} or next;
+               push @$tcafc, grep { defined }
+                               @$ipc{qw(-wq_s1 -wq_s2 -ipc_req -ipc_res)};
+       }
 }
 
 # usage: my %sig = $lei->atfork_child_wq($wq);
 #       local @SIG{keys %sig} = values %sig;
 sub atfork_child_wq {
        my ($self, $wq) = @_;
-       @$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
+       my ($sock, $l2m_wq_s1);
+       (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
+       $self->{sock} = $sock if -S $sock;
+       $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1;
        %PATH2CFG = ();
        $quit = \&CORE::exit;
        @TO_CLOSE_ATFORK_CHILD = ();
@@ -304,15 +312,23 @@ sub atfork_child_wq {
 # usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
 sub atfork_parent_wq {
        my ($self, $wq) = @_;
-       if ($wq->wq_workers) {
-               my $env = delete $self->{env}; # env is inherited at fork
-               my $ret = bless { %$self }, ref($self);
-               $self->{env} = $env;
-               delete @$ret{qw(-lei_store cfg pgr)};
-               ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
-       } else {
-               ($self, @$self{0..2}, $self->{sock} // ());
+       my $env = delete $self->{env}; # env is inherited at fork
+       my $ret = bless { %$self }, ref($self);
+       if (my $dedupe = delete $ret->{dedupe}) {
+               $ret->{dedupe} = $wq->deep_clone($dedupe);
+       }
+       $self->{env} = $env;
+       delete @$ret{qw(-lei_store cfg pgr lxs)}; # keep l2m
+       my @io = delete @$ret{0..2};
+       $io[3] = delete($ret->{sock}) // *STDERR{GLOB};
+       my $l2m = $ret->{l2m};
+       if ($l2m && $l2m != $wq) {
+               $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
+               if (my @pids = $l2m->wq_close) {
+                       $wq->{l2m_pids} = \@pids;
+               }
        }
+       ($ret, @io);
 }
 
 sub _help ($;$) {
@@ -656,7 +672,7 @@ sub start_mua {
        @cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd;
        push @cmd, $mfolder unless defined($replaced);
        $sock //= $self->{sock};
-       if ($sock) { # lei(1) client process runs it
+       if ($PublicInbox::DS::in_loop) { # lei(1) client process runs it
                send($sock, exec_buf(\@cmd, {}), MSG_EOR);
        } else { # oneshot
                $self->{"mua.pid.$self.$$"} = spawn(\@cmd);
index c0b423f63e6b5774a0a1fead865dff033610cc6f..538d6bd56cbf1d1bb530a8ca07f846f82dba0f1f 100644 (file)
@@ -140,6 +140,16 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
        my ($self, $lei) = @_;
+       if (my $l2m = delete $lei->{l2m}) {
+               # gracefully stop lei2mail processes after all
+               # ->write_mail work is complete
+               delete $l2m->{-wq_s1};
+               if (my $rd = delete $l2m->{each_smsg_done}) {
+                       read($rd, my $buf, 1); # wait for EOF
+               }
+       }
+       # order matters, git->{-tmp}->DESTROY must not fire until
+       # {each_smsg_done} hits EOF above
        if (my $git = delete $self->{git}) {
                $git->async_wait_all;
        }
@@ -178,8 +188,6 @@ sub _json_pretty {
 
 sub ovv_each_smsg_cb { # runs in wq worker usually
        my ($self, $lei, $ibxish) = @_;
-       $lei->{ovv_buf} = \(my $buf = '');
-       delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
        my $json;
        $lei->{1}->autoflush(1);
        if (my $pkg = $self->{json}) {
@@ -187,7 +195,27 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                $json->utf8->canonical;
                $json->ascii(1) if $lei->{opt}->{ascii};
        }
-       if (my $l2m = $lei->{l2m}) {
+       my $l2m = $lei->{l2m};
+       if ($l2m && $l2m->{-wq_s1}) {
+               my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
+               # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout,
+               # $io[4] becomes a notification pipe that triggers EOF
+               # in this wq worker when all outstanding ->write_mail
+               # calls are complete
+               die "BUG: \$io[4] $io[4] unexpected" if $io[4];
+               pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!";
+               fcntl($io[4], 1031, 4096) if $^O eq 'linux';
+               delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
+               my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
+               $self->{git} = $git;
+               my $git_dir = $git->{git_dir};
+               sub {
+                       my ($smsg, $mitem) = @_;
+                       my $kw = []; # TODO get from mitem
+                       $l2m->wq_do('write_mail', \@io, $git_dir,
+                                       $smsg->{blob}, $lei_ipc, $kw)
+               }
+       } elsif ($l2m) {
                my $wcb = $l2m->write_cb($lei);
                my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
                $self->{git} = $git; # for ovv_atexit_child
@@ -199,6 +227,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                };
        } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
                my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+               $lei->{ovv_buf} = \(my $buf = '');
                sub { # DIY prettiness :P
                        my ($smsg, $mitem) = @_;
                        $smsg = _unbless_smsg($smsg, $mitem);
@@ -221,6 +250,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                }
        } elsif ($json) {
                my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+               $lei->{ovv_buf} = \(my $buf = '');
                sub {
                        my ($smsg, $mitem) = @_;
                        delete @$smsg{qw(tid num)};
index a80d58876070492c2708f0a2c39afc6e866b2856..d6e801e328aa410582b3235a91ebbb646212d362 100644 (file)
@@ -41,11 +41,17 @@ sub lei_q {
        $j = 1 if !$opt->{thread};
        $j++ if $opt->{'local'}; # for sto->search below
        $self->atfork_prepare_wq($lxs);
-       $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
-               // $lxs->wq_workers($j);
+       $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset);
+       $self->{lxs} = $lxs;
 
-       # no forking workers after this
        my $ovv = PublicInbox::LeiOverview->new($self) or return;
+       if (my $l2m = $self->{l2m}) {
+               $j = 4 if $j <= 4; # TODO configurable
+               $self->atfork_prepare_wq($l2m);
+               $l2m->wq_workers_start('lei2mail', $j, $self->oldset);
+       }
+
+       # no forking workers after this
        my $sto = $self->_lei_store(1);
        unshift(@srcs, $sto->search) if $opt->{'local'};
        my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
index 0e23b8da6ad01f56098e19af817ee618886a2b96..8d030227e736cc3df97a273471246736ef7e6445 100644 (file)
@@ -5,6 +5,7 @@
 package PublicInbox::LeiToMail;
 use strict;
 use v5.10.1;
+use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::Lock;
 use PublicInbox::ProcessPipe;
@@ -14,6 +15,7 @@ use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT);
+use PublicInbox::Git;
 
 my %kw2char = ( # Maildir characters
        draft => 'D',
@@ -422,4 +424,30 @@ sub lock_free {
        $_[0]->{base_type} =~ /\A(?:maildir|mh|imap|jmap)\z/ ? 1 : 0;
 }
 
+sub write_mail { # via ->wq_do
+       my ($self, $git_dir, $oid, $lei, $kw) = @_;
+       my $wcb = $self->{wcb} //= do { # first message
+               my %sig = $lei->atfork_child_wq($self);
+               @SIG{keys %sig} = values %sig; # not local
+               $lei->{dedupe}->prepare_dedupe;
+               $self->write_cb($lei);
+       };
+       my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
+       $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+}
+
+sub ipc_atfork_prepare {
+       my ($self) = @_;
+       # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
+       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]);
+       $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
+sub DESTROY {
+       my ($self) = @_;
+       for my $pid_git (grep(/\A$$\0/, keys %$self)) {
+               $self->{$pid_git}->async_wait_all;
+       }
+}
+
 1;
index 91864cd03766b31f3ef84fcdeead5c36f027f0b8..dc5cf3b65f453cb0eeabe297cc5351e8c67cb7cf 100644 (file)
@@ -126,6 +126,7 @@ sub query_thread_mset { # for --thread
                        @{$ctx->{xids}} = ();
                }
        } while (_mset_more($mset, $mo));
+       undef $each_smsg; # drops @io for l2m->{each_smsg_done}
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -147,6 +148,7 @@ sub query_mset { # non-parallel for non-"--thread" users
                        $each_smsg->($smsg, $it);
                }
        } while (_mset_more($mset, $mo));
+       undef $each_smsg; # drops @io for l2m->{each_smsg_done}
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -170,11 +172,14 @@ sub git {
 }
 
 sub query_done { # EOF callback
-       my ($lei) = @_;
-       $lei->{ovv}->ovv_end($lei);
-       if (my $l2m = $lei->{l2m}) {
-               $lei->start_mua unless $l2m->lock_free;
+       my ($self, $lei) = @_;
+       my $l2m = delete $lei->{l2m};
+       if (my $pids = delete $self->{l2m_pids}) {
+               my $ipc_worker_reap = $self->can('ipc_worker_reap');
+               dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids;
        }
+       $lei->{ovv}->ovv_end($lei);
+       $lei->start_mua if $l2m && !$l2m->lock_free;
        $lei->dclose;
 }
 
@@ -188,12 +193,10 @@ sub start_query { # always runs in main (lei-daemon) process
        }
        my $remotes = $self->{remotes} // [];
        if ($lei->{opt}->{thread}) {
-               $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
                for my $ibxish (@$srcs) {
                        $self->wq_do('query_thread_mset', $io, $lei, $ibxish);
                }
        } else {
-               $lei->{-parallel} = scalar(@$remotes);
                $self->wq_do('query_mset', $io, $lei, $srcs);
        }
        # TODO
@@ -226,12 +229,12 @@ sub do_query {
        $io[0] = undef;
        pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
 
-       $lei_orig->{lxs} = $self;
        $lei_orig->event_step_init; # wait for shutdowns
-       my $op_map = { '' => [ \&query_done, $lei_orig ] };
+       my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
        my $in_loop = exists $lei_orig->{sock};
        my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
-       if (my $l2m = $lei->{l2m}) {
+       my $l2m = $lei->{l2m};
+       if ($l2m) {
                $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
                $io[1] = $lei_orig->{1};
                $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
@@ -246,13 +249,17 @@ sub do_query {
                $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
                $opp->event_step;
                my $ipc_worker_reap = $self->can('ipc_worker_reap');
+               if (my $l2m_pids = delete $self->{l2m_pids}) {
+                       dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
+               }
                dwaitpid($_, $ipc_worker_reap, $self) for @pids;
        }
 }
 
 sub ipc_atfork_prepare {
        my ($self) = @_;
-       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+       # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
+       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
        $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
index e5c0b1e94d03079455ce6ecea8fd977c468c4998..b03f2d59abca5c2f921abdda68ce046a16505c52 100644 (file)
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 4
+#define SEND_FD_CAPA 5
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
        struct cmsghdr hdr;