]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
No ext_urls
[public-inbox.git] / lib / PublicInbox / LEI.pm
index 78b49a3bc1afca5d837884cf0cf6ac1a24b5fa77..d05b20dee4873ce57a6e0d9c44bf54d21dbe6c37 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Backend for `lei' (local email interface).  Unlike the C10K-oriented
@@ -6,8 +6,7 @@
 # local clients with read/write access to the FS and use as many
 # system resources as the local user has access to.
 package PublicInbox::LEI;
-use strict;
-use v5.10.1;
+use v5.12;
 use parent qw(PublicInbox::DS PublicInbox::LeiExternal
        PublicInbox::LeiQuery);
 use Getopt::Long ();
@@ -19,7 +18,6 @@ use IO::Handle ();
 use Fcntl qw(SEEK_SET);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::Lock;
 use PublicInbox::Eml;
@@ -130,9 +128,10 @@ sub url_folder_cache {
 
 sub ale {
        my ($self) = @_;
-       $self->{ale} //= do {
+       $self->{ale} // do {
                require PublicInbox::LeiALE;
-               $self->_lei_cfg(1)->{ale} //= PublicInbox::LeiALE->new($self);
+               my $cfg = $self->_lei_cfg(1);
+               $self->{ale} = $cfg->{ale} //= PublicInbox::LeiALE->new($self);
        };
 }
 
@@ -253,6 +252,8 @@ our %CMD = ( # sorted in order of importance/use:
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
        qw(prune), @c_opt ],
 
+'reindex' => [ '', 'reindex all locally-indexed messages', @c_opt ],
+
 'index' => [ 'LOCATION...', 'one-time index from URL or filesystem',
        qw(in-format|F=s kw! offset=i recursive|r exclude=s include|I=s
        verbose|v+ incremental!), @net_opt, # mainly for --proxy=
@@ -397,8 +398,10 @@ my %OPTDESC = (
                'include specified external(s) in search' ],
 'only|O=s@     q' => [ 'LOCATION',
                'only use specified external(s) for search' ],
-'jobs=s        q' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
-               'control number of search and writer jobs' ],
+'jobs|j=s' => [ 'JOBSPEC',
+               'control number of query and writer jobs' .
+               "integers delimited by `,', either of which may be omitted"
+               ],
 'jobs|j=i      add-external' => 'set parallelism when indexing after --mirror',
 
 'in-format|F=s' => $stdin_formats,
@@ -410,6 +413,9 @@ my %OPTDESC = (
 'url   ls-mail-source' => 'show full URL of newsgroup or IMAP folder',
 'format|f=s    ls-external' => $ls_format,
 
+'prune:s       forget-search' =>
+       ['TYPE|local|remote', 'prune all, remote or local folders' ],
+
 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ],
 'offset=i' => ['OFF', 'search result offset (default: 0)'],
 
@@ -537,12 +543,11 @@ sub child_error { # passes non-fatal curl exit codes to user
        local $current_lei = $self;
        $child_error ||= 1 << 8;
        warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg;
+       $self->{child_error} ||= $child_error;
        if ($self->{pkt_op_p}) { # to top lei-daemon
                $self->{pkt_op_p}->pkt_do('child_error', $child_error);
        } elsif ($self->{sock}) { # to lei(1) client
                send($self->{sock}, "child_error $child_error", MSG_EOR);
-       } else { # non-lei admin command
-               $self->{child_error} ||= $child_error;
        } # else noop if client disconnected
 }
 
@@ -556,17 +561,17 @@ sub note_sigpipe { # triggers sigpipe_handler
 sub _lei_atfork_child {
        my ($self, $persist) = @_;
        # we need to explicitly close things which are on stack
+       my $cfg = $self->{cfg};
        if ($persist) {
                open $self->{3}, '<', '/' or die "open(/) $!";
                fchdir($self);
                close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)}));
-               if (my $cfg = $self->{cfg}) {
-                       delete @$cfg{qw(-lei_store -watches -lei_note_event)};
-               }
+               delete @$cfg{qw(-lei_store -watches -lei_note_event)};
        } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly
                open STDERR, '+>&='.fileno($self->{2}) or warn "open $!";
                STDERR->autoflush(1);
                POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!";
+               delete @$cfg{qw(-watches -lei_note_event)};
        }
        close($_) for (grep(defined, delete @$self{qw(old_1 au_done)}));
        delete $self->{-socks};
@@ -630,29 +635,43 @@ sub pkt_ops {
 
 sub workers_start {
        my ($lei, $wq, $jobs, $ops, $flds) = @_;
-       $ops = pkt_ops($lei, { ($ops ? %$ops : ()) });
+       $ops //= {};
+       ($wq->can('net_merge_all_done') && $lei->{auth}) and
+               $lei->{auth}->op_merge($ops, $wq, $lei);
+       pkt_ops($lei, $ops);
        $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
        my $end = $lei->pkt_op_pair;
        my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
        $flds->{lei} = $lei;
-       $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
+       $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds,
+               $wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        delete $lei->{pkt_op_p};
        my $op_c = delete $lei->{pkt_op_c};
        @$end = ();
        $lei->event_step_init;
-       $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        ($op_c, $ops);
 }
 
 # call this when we're ready to wait on events and yield to other clients
 sub wait_wq_events {
        my ($lei, $op_c, $ops) = @_;
+       my $wq1 = $lei->{wq1};
+       ($wq1 && $wq1->can('net_merge_all_done') && !$lei->{auth}) and
+               $wq1->net_merge_all_done;
        for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
                $wq->wq_close;
        }
+       $wq1->{lei_sock} = $lei->{sock} if $wq1;
        $op_c->{ops} = $ops;
 }
 
+sub wq1_start {
+       my ($lei, $wq, $jobs) = @_;
+       my ($op_c, $ops) = workers_start($lei, $wq, $jobs // 1);
+       $lei->{wq1} = $wq;
+       wait_wq_events($lei, $op_c, $ops); # net_merge_all_done if !{auth}
+}
+
 sub _help {
        require PublicInbox::LeiHelp;
        PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
@@ -766,7 +785,7 @@ EOM
        }
 }
 
-sub lazy_cb ($$$) {
+sub lazy_cb ($$$) { # $pfx is _complete_ or lei_
        my ($self, $cmd, $pfx) = @_;
        my $ucmd = $cmd;
        $ucmd =~ tr/-/_/;
@@ -801,7 +820,8 @@ sub dispatch {
                                next if $d eq ''; # same as git(1)
                                chdir $d or return fail($self, "cd $d: $!");
                        }
-                       open $self->{3}, '.' or return fail($self, "open . $!");
+                       open $self->{3}, '<', '.' or
+                               return fail($self, "open . $!");
                }
                $cb->($self, @argv);
        } elsif (grep(/\A-/, $cmd, @argv)) { # --help or -h only
@@ -1131,6 +1151,7 @@ sub event_step {
                if (scalar(@fds) == 1 && !defined($fds[0])) {
                        return if $! == EAGAIN;
                        die "recvmsg: $!" if $! != ECONNRESET;
+                       $buf = '';
                        @fds = (); # for open loop below:
                }
                for (@fds) { open my $rfh, '+<&=', $_ }
@@ -1159,10 +1180,10 @@ sub event_step {
 sub event_step_init {
        my ($self) = @_;
        my $sock = $self->{sock} or return;
-       $self->{-event_init_done} //= do { # persist til $ops done
+       $self->{-event_init_done} // do { # persist til $ops done
                $sock->blocking(0);
                $self->SUPER::new($sock, EPOLLIN);
-               $sock;
+               $self->{-event_init_done} = $sock;
        };
 }
 
@@ -1258,6 +1279,9 @@ sub lazy_start {
                        require PublicInbox::CmdIPC4;
                        $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4');
                        PublicInbox::CmdIPC4->can('recv_cmd4');
+               } // do {
+                       $send_cmd = PublicInbox::Syscall->can('send_cmd4');
+                       PublicInbox::Syscall->can('recv_cmd4');
                };
        }
        $recv_cmd or die <<"";
@@ -1366,13 +1390,12 @@ sub DESTROY {
        # preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # dwaitpid callback
-       my ($arg, $pid) = @_;
-       my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof)
+       my ($pid, $wq, $lei) = @_;
        local $current_lei = $lei;
        my $err_type = $lei->{-err_type};
        $? and $lei->child_error($?,
-                       $err_type ? "$err_type errors during $lei->{cmd}" : ());
+               $err_type ? "$err_type errors during $lei->{cmd} \$?=$?" : ());
        $lei->dclose;
 }
 
@@ -1383,9 +1406,11 @@ sub fchdir {
 }
 
 sub wq_eof { # EOF callback for main daemon
-       my ($lei) = @_;
+       my ($lei, $wq_fld) = @_;
        local $current_lei = $lei;
-       delete $lei->{wq1} // return $lei->fail; # already failed
+       my $wq = delete $lei->{$wq_fld // 'wq1'};
+       $lei->sto_done_request($wq);
+       $wq // $lei->fail; # already failed
 }
 
 sub watch_state_ok ($) {
@@ -1484,23 +1509,22 @@ sub git_oid {
 }
 
 sub lms {
-       my ($lei, $rw) = @_;
+       my ($lei, $creat) = @_;
        my $sto = $lei->{sto} // _lei_store($lei) // return;
        require PublicInbox::LeiMailSync;
        my $f = "$sto->{priv_eidx}->{topdir}/mail_sync.sqlite3";
-       (-f $f || $rw) ? PublicInbox::LeiMailSync->new($f) : undef;
+       (-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef;
 }
 
 sub sto_done_request {
-       my ($lei, $sock) = @_;
+       my ($lei, $wq) = @_;
+       return unless $lei->{sto};
        local $current_lei = $lei;
-       eval {
-               if ($sock //= $lei->{sock}) { # issue, async wait
-                       $lei->{sto}->wq_io_do('done', [ $sock ]);
-               } else { # forcibly wait
-                       my $wait = $lei->{sto}->wq_do('done');
-               }
-       };
+       my $sock = $wq ? $wq->{lei_sock} : undef;
+       $sock //= $lei->{sock};
+       my @io;
+       push(@io, $sock) if $sock; # async wait iff possible
+       eval { $lei->{sto}->wq_io_do('done', \@io) };
        warn($@) if $@;
 }