'ls-label' => [ '', 'list labels', qw(z|0 stats:s), @c_opt ],
'ls-mail-sync' => [ '[FILTER]', 'list mail sync folders',
qw(z|0 globoff|g invert-match|v local remote), @c_opt ],
+'ls-mail-source' => [ 'URL', 'list IMAP or NNTP mail source folders',
+ qw(z|0 ascii l), @c_opt ],
'forget-external' => [ 'LOCATION...|--prune',
'exclude further results from a publicinbox|extindex',
qw(prune), @c_opt ],
@c_opt ],
'import' => [ 'LOCATION...|--stdin',
'one-time import/update from URL or filesystem',
- qw(stdin| offset=i recursive|r exclude=s include|I=s
+ qw(stdin| offset=i recursive|r exclude=s include|I=s jobs=s new-only
lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ],
'forget-mail-sync' => [ 'LOCATION...',
'forget sync information for a mail folder', @c_opt ],
+'prune-mail-sync' => [ 'LOCATION...|--all',
+ 'prune dangling sync data for a mail folder', 'all:s', @c_opt ],
'export-kw' => [ 'LOCATION...|--all',
'one-time export of keywords of sync sources',
qw(all:s mode=s), @c_opt ],
'format|f=s ls-search' => ['OUT|json|jsonl|concatjson',
'listing output format' ],
'l ls-search' => 'long listing format',
+'l ls-mail-source' => 'long listing format',
'format|f=s ls-external' => $ls_format,
'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ],
'leistore.dir' => 'top-level storage location',
);
-my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers
sub _drop_wq {
my ($self) = @_;
$end;
}
+sub incr {
+ my ($self, $field, $nr) = @_;
+ $self->{counters}->{$field} += $nr;
+}
+
sub workers_start {
- my ($lei, $wq, $jobs, $ops) = @_;
+ my ($lei, $wq, $jobs, $ops, $flds) = @_;
$ops = {
'!' => [ \&fail_handler, $lei ],
'|' => [ \&sigpipe_handler, $lei ],
'x_it' => [ \&x_it, $lei ],
'child_error' => [ \&child_error, $lei ],
+ 'incr' => [ \&incr, $lei ],
($ops ? %$ops : ()),
};
$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
- $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+ $flds->{lei} = $lei;
+ $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
delete $lei->{pkt_op_p};
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
($op_c, $ops);
}
+# call this when we're ready to wait on events and yield to other clients
+sub wait_wq_events {
+ my ($lei, $op_c, $ops) = @_;
+ for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
+ $wq->wq_close(1);
+ }
+ $op_c->{ops} = $ops;
+}
+
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
sub event_step_init {
my ($self) = @_;
- return if $self->{-event_init_done}++;
- if (my $sock = $self->{sock}) { # using DS->EventLoop
+ my $sock = $self->{sock} or return;
+ $self->{-event_init_done} //= do { # persist til $ops done
$self->SUPER::new($sock, EPOLLIN|EPOLLET);
- }
+ $sock;
+ };
}
sub noop {}
# can immediately reread it
sub DESTROY {
my ($self) = @_;
+ if (my $counters = delete $self->{counters}) {
+ for my $k (sort keys %$counters) {
+ my $nr = $counters->{$k};
+ $self->child_error(1 << 8, "$nr $k messages");
+ }
+ }
$self->{1}->autoflush(1) if $self->{1};
stop_pager($self);
# preserve $? for ->fail or ->x_it code