]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
lei ls-mail-source: list IMAP folders and NNTP groups
[public-inbox.git] / lib / PublicInbox / LEI.pm
index f2dfc320efaf2bc07e4a9c9eed8b50190e2749c1..833d9c4dde67f2d7c88159c22d6084c8748360db 100644 (file)
@@ -196,6 +196,8 @@ our %CMD = ( # sorted in order of importance/use:
 'ls-label' => [ '', 'list labels', qw(z|0 stats:s), @c_opt ],
 'ls-mail-sync' => [ '[FILTER]', 'list mail sync folders',
                qw(z|0 globoff|g invert-match|v local remote), @c_opt ],
+'ls-mail-source' => [ 'URL', 'list IMAP or NNTP mail source folders',
+               qw(z|0 ascii l), @c_opt ],
 'forget-external' => [ 'LOCATION...|--prune',
        'exclude further results from a publicinbox|extindex',
        qw(prune), @c_opt ],
@@ -218,9 +220,6 @@ our %CMD = ( # sorted in order of importance/use:
        qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@),
        qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt,
        pass_through('-kw:foo for delete') ],
-'forget' => [ '[--stdin|--oid=OID|--by-mid=MID]',
-       "exclude message(s) on stdin from `q' search results",
-       qw(stdin| oid=s exact by-mid|mid:s), @c_opt ],
 
 'purge-mailsource' => [ 'LOCATION|--all',
        'remove imported messages from IMAP, Maildirs, and MH',
@@ -243,11 +242,13 @@ our %CMD = ( # sorted in order of importance/use:
         @c_opt ],
 'import' => [ 'LOCATION...|--stdin',
        'one-time import/update from URL or filesystem',
-       qw(stdin| offset=i recursive|r exclude=s include|I=s
+       qw(stdin| offset=i recursive|r exclude=s include|I=s jobs=s new-only
        lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
        qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ],
 'forget-mail-sync' => [ 'LOCATION...',
        'forget sync information for a mail folder', @c_opt ],
+'prune-mail-sync' => [ 'LOCATION...|--all',
+       'prune dangling sync data for a mail folder', 'all:s', @c_opt ],
 'export-kw' => [ 'LOCATION...|--all',
        'one-time export of keywords of sync sources',
        qw(all:s mode=s), @c_opt ],
@@ -382,6 +383,7 @@ my %OPTDESC = (
 'format|f=s    ls-search' => ['OUT|json|jsonl|concatjson',
                        'listing output format' ],
 'l     ls-search' => 'long listing format',
+'l     ls-mail-source' => 'long listing format',
 'format|f=s    ls-external' => $ls_format,
 
 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ],
@@ -424,7 +426,7 @@ my %CONFIG_KEYS = (
        'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers
 
 sub _drop_wq {
        my ($self) = @_;
@@ -444,8 +446,10 @@ sub x_it ($$) {
        # make sure client sees stdout before exit
        $self->{1}->autoflush(1) if $self->{1};
        dump_and_clear_log();
-       if (my $s = $self->{pkt_op_p} // $self->{sock}) {
-               send($s, "x_it $code", MSG_EOR);
+       if ($self->{pkt_op_p}) { # to top lei-daemon
+               $self->{pkt_op_p}->pkt_do('x_it', $code);
+       } elsif ($self->{sock}) { # to lei(1) client
+               send($self->{sock}, "x_it $code", MSG_EOR);
        } # else ignore if client disconnected
 }
 
@@ -481,9 +485,10 @@ sub sigint_reap {
 
 sub fail ($$;$) {
        my ($self, $buf, $exit_code) = @_;
+       $self->{failed}++;
        err($self, $buf) if defined $buf;
        # calls fail_handler:
-       send($self->{pkt_op_p}, '!', MSG_EOR) if $self->{pkt_op_p};
+       $self->{pkt_op_p}->pkt_do('!') if $self->{pkt_op_p};
        x_it($self, ($exit_code // 1) << 8);
        undef;
 }
@@ -502,18 +507,17 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) }
 sub child_error { # passes non-fatal curl exit codes to user
        my ($self, $child_error, $msg) = @_; # child_error is $?
        $self->err($msg) if $msg;
-       if (my $s = $self->{pkt_op_p} // $self->{sock}) {
-               # send to the parent lei-daemon or to lei(1) client
-               send($s, "child_error $child_error", MSG_EOR);
-       } elsif (!$PublicInbox::DS::in_loop) {
-               $self->{child_error} = $child_error;
+       if ($self->{pkt_op_p}) { # to top lei-daemon
+               $self->{pkt_op_p}->pkt_do('child_error', $child_error);
+       } elsif ($self->{sock}) { # to lei(1) client
+               send($self->{sock}, "child_error $child_error", MSG_EOR);
        } # else noop if client disconnected
 }
 
 sub note_sigpipe { # triggers sigpipe_handler
        my ($self, $fd) = @_;
        close(delete($self->{$fd})); # explicit close silences Perl warning
-       send($self->{pkt_op_p}, '|', MSG_EOR) if $self->{pkt_op_p};
+       $self->{pkt_op_p}->pkt_do('|') if $self->{pkt_op_p};
        x_it($self, 13);
 }
 
@@ -553,8 +557,8 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
        if (my $op = delete $self->{pkt_op_c}) { # in case of die
                $op->close; # PublicInbox::PktOp::close
        }
-       my $unclosed_after_die = delete($self->{pkt_op_p}) or return;
-       close $unclosed_after_die;
+       my $pkt_op_p = delete($self->{pkt_op_p}) or return;
+       close $pkt_op_p->{op_p};
 }
 
 sub pkt_op_pair {
@@ -566,19 +570,26 @@ sub pkt_op_pair {
        $end;
 }
 
+sub incr {
+       my ($self, $field, $nr) = @_;
+       $self->{counters}->{$field} += $nr;
+}
+
 sub workers_start {
-       my ($lei, $wq, $jobs, $ops) = @_;
+       my ($lei, $wq, $jobs, $ops, $flds) = @_;
        $ops = {
                '!' => [ \&fail_handler, $lei ],
                '|' => [ \&sigpipe_handler, $lei ],
                'x_it' => [ \&x_it, $lei ],
                'child_error' => [ \&child_error, $lei ],
+               'incr' => [ \&incr, $lei ],
                ($ops ? %$ops : ()),
        };
        $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
        my $end = $lei->pkt_op_pair;
        my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
-       $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+       $flds->{lei} = $lei;
+       $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
        delete $lei->{pkt_op_p};
        my $op_c = delete $lei->{pkt_op_c};
        @$end = ();
@@ -586,6 +597,15 @@ sub workers_start {
        ($op_c, $ops);
 }
 
+# call this when we're ready to wait on events and yield to other clients
+sub wait_wq_events {
+       my ($lei, $op_c, $ops) = @_;
+       for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
+               $wq->wq_close(1);
+       }
+       $op_c->{ops} = $ops;
+}
+
 sub _help {
        require PublicInbox::LeiHelp;
        PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
@@ -638,7 +658,15 @@ sub optparse ($$$) {
                        my $ok;
                        for my $o (@or) {
                                if ($o =~ /\A--([a-z0-9\-]+)/) {
-                                       $ok = defined($OPT->{$1});
+                                       my $sw = $1;
+                                       # assume pipe/regular file on stdin
+                                       # w/o args means stdin
+                                       if ($sw eq 'stdin' && !@$argv &&
+                                                       (-p $self->{0} ||
+                                                        -f _) && -r _) {
+                                               $OPT->{stdin} //= 1;
+                                       }
+                                       $ok = defined($OPT->{$sw});
                                        last if $ok;
                                } elsif (defined($argv->[$i])) {
                                        $ok = 1;
@@ -906,7 +934,7 @@ sub start_mua {
        }
        push @cmd, $mfolder unless defined($replaced);
        if ($self->{sock}) { # lei(1) client process runs it
-               # restore terminal: echo $query | lei q -stdin --mua=...
+               # restore terminal: echo $query | lei q --stdin --mua=...
                my $io = [];
                $io->[0] = $self->{1} if $self->{opt}->{stdin} && -t $self->{1};
                send_exec_cmd($self, $io, \@cmd, {});
@@ -1037,7 +1065,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 sub dclose {
        my ($self) = @_;
        delete $self->{-progress};
-       _drop_wq($self);
+       _drop_wq($self) if $self->{failed};
        close(delete $self->{1}) if $self->{1}; # may reap_compress
        $self->close if $self->{-event_init_done}; # PublicInbox::DS::close
 }
@@ -1071,10 +1099,11 @@ sub event_step {
 
 sub event_step_init {
        my ($self) = @_;
-       return if $self->{-event_init_done}++;
-       if (my $sock = $self->{sock}) { # using DS->EventLoop
+       my $sock = $self->{sock} or return;
+       $self->{-event_init_done} //= do { # persist til $ops done
                $self->SUPER::new($sock, EPOLLIN|EPOLLET);
-       }
+               $sock;
+       };
 }
 
 sub noop {}
@@ -1225,6 +1254,12 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected
 # can immediately reread it
 sub DESTROY {
        my ($self) = @_;
+       if (my $counters = delete $self->{counters}) {
+               for my $k (sort keys %$counters) {
+                       my $nr = $counters->{$k};
+                       $self->child_error(1 << 8, "$nr $k messages");
+               }
+       }
        $self->{1}->autoflush(1) if $self->{1};
        stop_pager($self);
        # preserve $? for ->fail or ->x_it code