]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/NetReader.pm
net_reader: load IO::Socket::Socks in all workers
[public-inbox.git] / lib / PublicInbox / NetReader.pm
1 # Copyright (C) 2021 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # common reader code for IMAP and NNTP (and maybe JMAP)
5 package PublicInbox::NetReader;
6 use strict;
7 use v5.10.1;
8 use parent qw(Exporter PublicInbox::IPC);
9 use PublicInbox::Eml;
10 use PublicInbox::Config;
11 our %IMAPflags2kw = map {; "\\\u$_" => $_ } qw(seen answered flagged draft);
12 $IMAPflags2kw{'$Forwarded'} = 'forwarded';  # RFC 5550
13
14 our @EXPORT = qw(uri_section imap_uri nntp_uri);
15
16 sub ndump {
17         require Data::Dumper;
18         Data::Dumper->new(\@_)->Useqq(1)->Terse(1)->Dump;
19 }
20
21 # returns the git config section name, e.g [imap "imaps://user@example.com"]
22 # without the mailbox, so we can share connections between different inboxes
23 sub uri_section ($) {
24         my ($uri) = @_;
25         $uri->scheme . '://' . $uri->authority;
26 }
27
28 sub socks_args ($) {
29         my ($val) = @_;
30         return if ($val // '') eq '';
31         if ($val =~ m!\Asocks5h:// (?: \[ ([^\]]+) \] | ([^:/]+) )
32                                         (?::([0-9]+))?/*\z!ix) {
33                 my ($h, $p) = ($1 // $2, $3 + 0);
34                 $h = '127.0.0.1' if $h eq '0';
35                 eval { require IO::Socket::Socks } or die <<EOM;
36 IO::Socket::Socks missing for socks5h://$h:$p
37 EOM
38                 # for Mail::IMAPClient
39                 return { ProxyAddr => $h, ProxyPort => $p };
40         }
41         die "$val not understood (only socks5h:// is supported)\n";
42 }
43
44 sub mic_new ($$$$) {
45         my ($self, $mic_arg, $sec, $uri) = @_;
46         my %mic_arg = %$mic_arg;
47         my $sa = $self->{cfg_opt}->{$sec}->{-proxy_cfg} || $self->{-proxy_cli};
48         if ($sa) {
49                 # this `require' needed for worker[1..Inf], since socks_args
50                 # only got called in worker[0]
51                 require IO::Socket::Socks;
52
53                 my %opt = %$sa;
54                 $opt{ConnectAddr} = delete $mic_arg{Server};
55                 $opt{ConnectPort} = delete $mic_arg{Port};
56                 $mic_arg{Socket} = IO::Socket::Socks->new(%opt) or die
57                         "E: <$$uri> ".eval('$IO::Socket::Socks::SOCKS_ERROR');
58         }
59         PublicInbox::IMAPClient->new(%mic_arg, Keepalive => 1);
60 }
61
62 sub auth_anon_cb { '' }; # for Mail::IMAPClient::Authcallback
63
64 sub onion_hint ($$) {
65         my ($lei, $uri) = @_;
66         $uri->host =~ /\.onion\z/i or return "\n";
67         my $t = $uri->isa('PublicInbox::URIimap') ? 'imap' : 'nntp';
68         my $url = uri_section($uri);
69         my $set_cfg = 'lei config';
70         if (!$lei) { # public-inbox-watch
71                 my $f = $ENV{PI_CONFIG} || '~/.public-inbox/config';
72                 $set_cfg = "git config -f $f";
73         }
74         <<EOM
75
76 Assuming you have Tor configured and running locally on port 9050,
77 try configuring a socks5h:// proxy:
78
79         url=$url
80         $set_cfg $t.\$url.proxy socks5h://127.0.0.1:9050
81
82 ...before retrying your current command
83 EOM
84 }
85
86 # mic_for may prompt the user and store auth info, prepares mic_get
87 sub mic_for ($$$$) { # mic = Mail::IMAPClient
88         my ($self, $uri, $mic_common, $lei) = @_;
89         require PublicInbox::GitCredential;
90         my $cred = bless {
91                 url => "$uri",
92                 protocol => $uri->scheme,
93                 host => $uri->host,
94                 username => $uri->user,
95                 password => $uri->password,
96         }, 'PublicInbox::GitCredential';
97         my $sec = uri_section($uri);
98         my $common = $mic_common->{$sec} // {};
99         # IMAPClient and Net::Netrc both mishandles `0', so we pass `127.0.0.1'
100         my $host = $cred->{host};
101         $host = '127.0.0.1' if $host eq '0';
102         my $mic_arg = {
103                 Port => $uri->port,
104                 Server => $host,
105                 Ssl => $uri->scheme eq 'imaps',
106                 %$common, # may set Starttls, Compress, Debug ....
107         };
108         $mic_arg->{Ssl} = 1 if $uri->scheme eq 'imaps';
109         require PublicInbox::IMAPClient;
110         my $mic = mic_new($self, $mic_arg, $sec, $uri) or
111                 die "E: <$uri> new: $@".onion_hint($lei, $uri);
112
113         # default to using STARTTLS if it's available, but allow
114         # it to be disabled since I usually connect to localhost
115         if (!$mic_arg->{Ssl} && !defined($mic_arg->{Starttls}) &&
116                         $mic->has_capability('STARTTLS') &&
117                         $mic->can('starttls')) {
118                 $mic->starttls or die "E: <$uri> STARTTLS: $@\n";
119         }
120
121         # do we even need credentials?
122         if (!defined($cred->{username}) &&
123                         $mic->has_capability('AUTH=ANONYMOUS')) {
124                 $cred = undef;
125         }
126         if ($cred) {
127                 my $p = $cred->{password} // $cred->check_netrc($lei);
128                 $cred->fill($lei) unless defined($p); # may prompt user here
129                 $mic->User($mic_arg->{User} = $cred->{username});
130                 $mic->Password($mic_arg->{Password} = $cred->{password});
131         } else { # AUTH=ANONYMOUS
132                 $mic->Authmechanism($mic_arg->{Authmechanism} = 'ANONYMOUS');
133                 $mic_arg->{Authcallback} = 'auth_anon_cb';
134                 $mic->Authcallback(\&auth_anon_cb);
135         }
136         my $err;
137         if ($mic->login && $mic->IsAuthenticated) {
138                 # success! keep IMAPClient->new arg in case we get disconnected
139                 $self->{net_arg}->{$sec} = $mic_arg;
140                 if ($cred) {
141                         $uri->user($cred->{username}) if !defined($uri->user);
142                 } elsif ($mic_arg->{Authmechanism} eq 'ANONYMOUS') {
143                         $uri->auth('ANONYMOUS') if !defined($uri->auth);
144                 }
145         } else {
146                 $err = "E: <$uri> LOGIN: $@\n";
147                 if ($cred && defined($cred->{password})) {
148                         $err =~ s/\Q$cred->{password}\E/*******/g;
149                 }
150                 $mic = undef;
151         }
152         $cred->run($mic ? 'approve' : 'reject') if $cred && $cred->{filled};
153         if ($err) {
154                 $lei ? $lei->fail($err) : warn($err);
155         }
156         $mic;
157 }
158
159 # Net::NNTP doesn't support CAPABILITIES, yet
160 sub try_starttls ($) {
161         my ($host) = @_;
162         return if $host =~ /\.onion\z/s;
163         return if $host =~ /\A127\.[0-9]+\.[0-9]+\.[0-9]+\z/s;
164         return if $host eq '::1';
165         1;
166 }
167
168 sub nn_new ($$$) {
169         my ($nn_arg, $nntp_cfg, $uri) = @_;
170         my $nn;
171         if (defined $nn_arg->{ProxyAddr}) {
172                 require PublicInbox::NetNNTPSocks;
173                 eval { $nn = PublicInbox::NetNNTPSocks->new_socks(%$nn_arg) };
174                 die "E: <$uri> $@\n" if $@;
175         } else {
176                 $nn = Net::NNTP->new(%$nn_arg) or return;
177         }
178
179         # default to using STARTTLS if it's available, but allow
180         # it to be disabled for localhost/VPN users
181         if (!$nn_arg->{SSL} && $nn->can('starttls')) {
182                 if (!defined($nntp_cfg->{starttls}) &&
183                                 try_starttls($nn_arg->{Host})) {
184                         # soft fail by default
185                         $nn->starttls or warn <<"";
186 W: <$uri> STARTTLS tried and failed (not requested)
187
188                 } elsif ($nntp_cfg->{starttls}) {
189                         # hard fail if explicitly configured
190                         $nn->starttls or die <<"";
191 E: <$uri> STARTTLS requested and failed
192
193                 }
194         } elsif ($nntp_cfg->{starttls}) {
195                 $nn->can('starttls') or
196                         die "E: <$uri> Net::NNTP too old for STARTTLS\n";
197                 $nn->starttls or die <<"";
198 E: <$uri> STARTTLS requested and failed
199
200         }
201         $nn;
202 }
203
204 sub nn_for ($$$$) { # nn = Net::NNTP
205         my ($self, $uri, $nn_common, $lei) = @_;
206         my $sec = uri_section($uri);
207         my $nntp_cfg = $self->{cfg_opt}->{$sec} //= {};
208         my $host = $uri->host;
209         # Net::NNTP and Net::Netrc both mishandle `0', so we pass `127.0.0.1'
210         $host = '127.0.0.1' if $host eq '0';
211         my $cred;
212         my ($u, $p);
213         if (defined(my $ui = $uri->userinfo)) {
214                 require PublicInbox::GitCredential;
215                 $cred = bless {
216                         url => $sec,
217                         protocol => $uri->scheme,
218                         host => $host,
219                 }, 'PublicInbox::GitCredential';
220                 ($u, $p) = split(/:/, $ui, 2);
221                 ($cred->{username}, $cred->{password}) = ($u, $p);
222                 $p //= $cred->check_netrc($lei);
223         }
224         my $common = $nn_common->{$sec} // {};
225         my $nn_arg = {
226                 Port => $uri->port,
227                 Host => $host,
228                 %$common, # may Debug ....
229         };
230         $nn_arg->{SSL} = 1 if $uri->secure; # snews == nntps
231         my $sa = $self->{-proxy_cli};
232         %$nn_arg = (%$nn_arg, %$sa) if $sa;
233         my $nn = nn_new($nn_arg, $nntp_cfg, $uri) or
234                 die "E: <$uri> new: $@".onion_hint($lei, $uri);
235         if ($cred) {
236                 $cred->fill($lei) unless defined($p); # may prompt user here
237                 if ($nn->authinfo($u, $p)) {
238                         push @{$nntp_cfg->{-postconn}}, [ 'authinfo', $u, $p ];
239                 } else {
240                         warn "E: <$uri> AUTHINFO $u XXXX failed\n";
241                         $nn = undef;
242                 }
243         }
244
245         if ($nntp_cfg->{compress}) {
246                 # https://rt.cpan.org/Ticket/Display.html?id=129967
247                 if ($nn->can('compress')) {
248                         if ($nn->compress) {
249                                 push @{$nntp_cfg->{-postconn}}, [ 'compress' ];
250                         } else {
251                                 warn "W: <$uri> COMPRESS failed\n";
252                         }
253                 } else {
254                         delete $nntp_cfg->{compress};
255                         warn <<"";
256 W: <$uri> COMPRESS not supported by Net::NNTP
257 W: see https://rt.cpan.org/Ticket/Display.html?id=129967 for updates
258
259                 }
260         }
261
262         $self->{net_arg}->{$sec} = $nn_arg;
263         $cred->run($nn ? 'approve' : 'reject') if $cred && $cred->{filled};
264         $nn;
265 }
266
267 sub imap_uri {
268         my ($url, $ls_ok) = @_;
269         require PublicInbox::URIimap;
270         my $uri = PublicInbox::URIimap->new($url);
271         $uri && ($ls_ok || $uri->mailbox) ? $uri->canonical : undef;
272 }
273
274 my %IS_NNTP = (news => 1, snews => 1, nntp => 1, nntps => 1);
275 sub nntp_uri {
276         my ($url, $ls_ok) = @_;
277         require PublicInbox::URInntps;
278         my $uri = PublicInbox::URInntps->new($url);
279         $uri && $IS_NNTP{$uri->scheme} && ($ls_ok || $uri->group) ?
280                 $uri->canonical : undef;
281 }
282
283 sub cfg_intvl ($$$) {
284         my ($cfg, $key, $url) = @_;
285         my $v = $cfg->urlmatch($key, $url) // return;
286         $v =~ /\A[0-9]+(?:\.[0-9]+)?\z/s and return $v + 0;
287         if (ref($v) eq 'ARRAY') {
288                 $v = join(', ', @$v);
289                 warn "W: $key has multiple values: $v\nW: $key ignored\n";
290         } else {
291                 warn "W: $key=$v is not a numeric value in seconds\n";
292         }
293 }
294
295 sub cfg_bool ($$$) {
296         my ($cfg, $key, $url) = @_;
297         my $orig = $cfg->urlmatch($key, $url) // return;
298         my $bool = $cfg->git_bool($orig);
299         warn "W: $key=$orig for $url is not boolean\n" unless defined($bool);
300         $bool;
301 }
302
303 # flesh out common IMAP-specific data structures
304 sub imap_common_init ($;$) {
305         my ($self, $lei) = @_;
306         return unless $self->{imap_order};
307         $self->{quiet} = 1 if $lei && $lei->{opt}->{quiet};
308         eval { require PublicInbox::IMAPClient } or
309                 die "Mail::IMAPClient is required for IMAP:\n$@\n";
310         ($lei || eval { require PublicInbox::IMAPTracker }) or
311                 die "DBD::SQLite is required for IMAP\n:$@\n";
312         require PublicInbox::URIimap;
313         my $cfg = $self->{pi_cfg} // $lei->_lei_cfg;
314         my $mic_common = {}; # scheme://authority => Mail:IMAPClient arg
315         for my $uri (@{$self->{imap_order}}) {
316                 my $sec = uri_section($uri);
317
318                 # knobs directly for Mail::IMAPClient->new
319                 for my $k (qw(Starttls Debug Compress Ignoresizeerrors)) {
320                         my $bool = cfg_bool($cfg, "imap.$k", $$uri) // next;
321                         $mic_common->{$sec}->{$k} = $bool;
322                 }
323                 my $to = cfg_intvl($cfg, 'imap.timeout', $$uri);
324                 $mic_common->{$sec}->{Timeout} = $to if $to;
325
326                 # knobs we use ourselves:
327                 my $sa = socks_args($cfg->urlmatch('imap.Proxy', $$uri));
328                 $self->{cfg_opt}->{$sec}->{-proxy_cfg} = $sa if $sa;
329                 for my $k (qw(pollInterval idleInterval)) {
330                         $to = cfg_intvl($cfg, "imap.$k", $$uri) // next;
331                         $self->{cfg_opt}->{$sec}->{$k} = $to;
332                 }
333                 my $k = 'imap.fetchBatchSize';
334                 my $bs = $cfg->urlmatch($k, $$uri) // next;
335                 if ($bs =~ /\A([0-9]+)\z/) {
336                         $self->{cfg_opt}->{$sec}->{batch_size} = $bs;
337                 } else {
338                         warn "$k=$bs is not an integer\n";
339                 }
340         }
341         # make sure we can connect and cache the credentials in memory
342         my $mics = {}; # schema://authority => IMAPClient obj
343         for my $orig_uri (@{$self->{imap_order}}) {
344                 my $sec = uri_section($orig_uri);
345                 my $uri = PublicInbox::URIimap->new("$sec/");
346                 my $mic = $mics->{$sec} //=
347                                 mic_for($self, $uri, $mic_common, $lei) //
348                                 die "Unable to continue\n";
349                 next unless $self->isa('PublicInbox::NetWriter');
350                 my $dst = $orig_uri->mailbox // next;
351                 next if $mic->exists($dst); # already exists
352                 $mic->create($dst) or die "CREATE $dst failed <$orig_uri>: $@";
353         }
354         $mics;
355 }
356
357 # flesh out common NNTP-specific data structures
358 sub nntp_common_init ($;$) {
359         my ($self, $lei) = @_;
360         return unless $self->{nntp_order};
361         $self->{quiet} = 1 if $lei && $lei->{opt}->{quiet};
362         eval { require Net::NNTP } or
363                 die "Net::NNTP is required for NNTP:\n$@\n";
364         ($lei || eval { require PublicInbox::IMAPTracker }) or
365                 die "DBD::SQLite is required for NNTP\n:$@\n";
366         my $cfg = $self->{pi_cfg} // $lei->_lei_cfg;
367         my $nn_common = {}; # scheme://authority => Net::NNTP->new arg
368         for my $uri (@{$self->{nntp_order}}) {
369                 my $sec = uri_section($uri);
370                 my $args = $nn_common->{$sec} //= {};
371
372                 # Debug and Timeout are passed to Net::NNTP->new
373                 my $v = cfg_bool($cfg, 'nntp.Debug', $$uri);
374                 $args->{Debug} = $v if defined $v;
375                 my $to = cfg_intvl($cfg, 'nntp.Timeout', $$uri);
376                 $args->{Timeout} = $to if $to;
377                 my $sa = socks_args($cfg->urlmatch('nntp.Proxy', $$uri));
378                 %$args = (%$args, %$sa) if $sa;
379
380                 # Net::NNTP post-connect commands
381                 for my $k (qw(starttls compress)) {
382                         $v = cfg_bool($cfg, "nntp.$k", $$uri) // next;
383                         $self->{cfg_opt}->{$sec}->{$k} = $v;
384                 }
385
386                 # -watch internal option
387                 for my $k (qw(pollInterval)) {
388                         $to = cfg_intvl($cfg, "nntp.$k", $$uri) // next;
389                         $self->{cfg_opt}->{$sec}->{$k} = $to;
390                 }
391         }
392         # make sure we can connect and cache the credentials in memory
393         my %nn; # schema://authority => Net::NNTP object
394         for my $uri (@{$self->{nntp_order}}) {
395                 my $sec = uri_section($uri);
396                 $nn{$sec} //= nn_for($self, $uri, $nn_common, $lei);
397         }
398         \%nn; # for optional {nn_cached}
399 }
400
401 sub add_url {
402         my ($self, $arg, $ls_ok) = @_;
403         my $uri;
404         if ($uri = imap_uri($arg, $ls_ok)) {
405                 $_[1] = $$uri; # canonicalized
406                 push @{$self->{imap_order}}, $uri;
407         } elsif ($uri = nntp_uri($arg, $ls_ok)) {
408                 $_[1] = $$uri; # canonicalized
409                 push @{$self->{nntp_order}}, $uri;
410         } else {
411                 push @{$self->{unsupported_url}}, $arg;
412         }
413 }
414
415 sub errors {
416         my ($self, $lei) = @_;
417         if (my $u = $self->{unsupported_url}) {
418                 return "Unsupported URL(s): @$u";
419         }
420         if ($self->{imap_order}) {
421                 eval { require PublicInbox::IMAPClient } or
422                         die "Mail::IMAPClient is required for IMAP:\n$@\n";
423         }
424         if ($self->{nntp_order}) {
425                 eval { require Net::NNTP } or
426                         die "Net::NNTP is required for NNTP:\n$@\n";
427         }
428         my $sa = socks_args($lei ? $lei->{opt}->{proxy} : undef);
429         $self->{-proxy_cli} = $sa if $sa;
430         undef;
431 }
432
433 sub flags2kw ($$$$) {
434         my ($self, $uri, $uid, $flags) = @_;
435         my $kw = [];
436         for my $f (split(/ /, $flags)) {
437                 if (my $k = $IMAPflags2kw{$f}) {
438                         push @$kw, $k;
439                 } elsif ($f eq "\\Recent") { # not in JMAP
440                 } elsif ($f eq "\\Deleted") { # not in JMAP
441                         return;
442                 } elsif ($self->{verbose}) {
443                         warn "# unknown IMAP flag $f <$uri/;UID=$uid>\n";
444                 }
445         }
446         @$kw = sort @$kw; # for LeiSearch->kw_changed and UI/UX purposes
447         $kw;
448 }
449
450 sub _imap_do_msg ($$$$$) {
451         my ($self, $uri, $uid, $raw, $flags) = @_;
452         # our target audience expects LF-only, save storage
453         $$raw =~ s/\r\n/\n/sg;
454         my $kw = defined($flags) ?
455                 (flags2kw($self, $uri, $uid, $flags) // return) : undef;
456         my ($eml_cb, @args) = @{$self->{eml_each}};
457         $eml_cb->($uri, $uid, $kw, PublicInbox::Eml->new($raw), @args);
458 }
459
460 sub run_commit_cb ($) {
461         my ($self) = @_;
462         my $cmt_cb_args = $self->{on_commit} or return;
463         my ($cb, @args) = @$cmt_cb_args;
464         $cb->(@args);
465 }
466
467 sub itrk_last ($$;$$) {
468         my ($self, $uri, $r_uidval, $mic) = @_;
469         return (undef, undef, $r_uidval) unless $self->{incremental};
470         my ($itrk, $l_uid, $l_uidval);
471         if (defined(my $lms = $self->{-lms_ro})) { # LeiMailSync or 0
472                 $uri->uidvalidity($r_uidval) if defined $r_uidval;
473                 if ($mic) {
474                         my $auth = $mic->Authmechanism // '';
475                         $uri->auth($auth) if $auth eq 'ANONYMOUS';
476                         my $user = $mic->User;
477                         $uri->user($user) if defined($user);
478                 }
479                 my $x;
480                 $l_uid = ($lms && ($x = $lms->location_stats($$uri))) ?
481                                 $x->{'uid.max'} : undef;
482                 # itrk remains undef, lei/store worker writes to
483                 # mail_sync.sqlite3
484         } else {
485                 $itrk = PublicInbox::IMAPTracker->new($$uri);
486                 ($l_uidval, $l_uid) = $itrk->get_last($$uri);
487         }
488         ($itrk, $l_uid, $l_uidval //= $r_uidval);
489 }
490
491 # import flags of already-seen messages
492 sub each_old_flags ($$$$) {
493         my ($self, $mic, $uri, $l_uid) = @_;
494         $l_uid ||= 1;
495         my $sec = uri_section($uri);
496         my $bs = ($self->{cfg_opt}->{$sec}->{batch_size} // 1) * 10000;
497         my ($eml_cb, @args) = @{$self->{eml_each}};
498         $self->{quiet} or warn "# $uri syncing flags 1:$l_uid\n";
499         for (my $n = 1; $n <= $l_uid; $n += $bs) {
500                 my $end = $n + $bs;
501                 $end = $l_uid if $end > $l_uid;
502                 my $r = $mic->fetch_hash("$n:$end", 'FLAGS');
503                 if (!$r) {
504                         return if $!{EINTR} && $self->{quit};
505                         return "E: $uri UID FETCH $n:$end error: $!";
506                 }
507                 while (my ($uid, $per_uid) = each %$r) {
508                         my $kw = flags2kw($self, $uri, $uid, $per_uid->{FLAGS})
509                                 // next;
510                         # LeiImport->input_net_cb
511                         $eml_cb->($uri, $uid, $kw, undef, @args);
512                 }
513         }
514 }
515
516 # returns true if PERMANENTFLAGS indicates FLAGS of already imported
517 # messages are meaningful
518 sub perm_fl_ok ($) {
519         my ($perm_fl) = @_;
520         return if !defined($perm_fl);
521         for my $f (split(/[ \t]+/, $perm_fl)) {
522                 return 1 if $IMAPflags2kw{$f};
523         }
524         undef;
525 }
526
527 # may be overridden in NetWriter or Watch
528 sub folder_select { $_[0]->{each_old} ? 'select' : 'examine' }
529
530 sub _imap_fetch_all ($$$) {
531         my ($self, $mic, $orig_uri) = @_;
532         my $sec = uri_section($orig_uri);
533         my $mbx = $orig_uri->mailbox;
534         $mic->Clear(1); # trim results history
535
536         # we need to check for mailbox writability to see if we care about
537         # FLAGS from already-imported messages.
538         my $cmd = $self->folder_select;
539         $mic->$cmd($mbx) or return "E: \U$cmd\E $mbx ($sec) failed: $!";
540
541         my ($r_uidval, $r_uidnext, $perm_fl);
542         for ($mic->Results) {
543                 /^\* OK \[PERMANENTFLAGS \(([^\)]*)\)\].*/ and $perm_fl = $1;
544                 /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ and $r_uidval = $1;
545                 /^\* OK \[UIDNEXT ([0-9]+)\].*/ and $r_uidnext = $1;
546         }
547         $r_uidval //= $mic->uidvalidity($mbx) //
548                 return "E: $orig_uri cannot get UIDVALIDITY";
549         $r_uidnext //= $mic->uidnext($mbx) //
550                 return "E: $orig_uri cannot get UIDNEXT";
551         my $expect = $orig_uri->uidvalidity // $r_uidval;
552         return <<EOF if $expect != $r_uidval;
553 E: $orig_uri UIDVALIDITY mismatch (got $r_uidval)
554 EOF
555
556         my $uri = $orig_uri->clone;
557         my $single_uid = $uri->uid;
558         my ($itrk, $l_uid, $l_uidval) = itrk_last($self, $uri, $r_uidval, $mic);
559         if (defined($single_uid)) {
560                 $itrk = $l_uid = undef;
561                 $uri->uid(undef); # for eml_cb
562         }
563         return <<EOF if $l_uidval != $r_uidval;
564 E: $uri UIDVALIDITY mismatch
565 E: local=$l_uidval != remote=$r_uidval
566 EOF
567         $uri->uidvalidity($r_uidval);
568         $l_uid //= 0;
569         my $r_uid = $r_uidnext - 1;
570         return <<EOF if $l_uid > $r_uid;
571 E: $uri local UID exceeds remote ($l_uid > $r_uid)
572 E: $uri strangely, UIDVALIDLITY matches ($l_uidval)
573 EOF
574         $mic->Uid(1); # the default, we hope
575         my $err;
576         my $use_fl = perm_fl_ok($perm_fl);
577         if (!defined($single_uid) && $self->{each_old} && $use_fl) {
578                 $err = each_old_flags($self, $mic, $uri, $l_uid);
579                 return $err if $err;
580         }
581         return if $l_uid >= $r_uid; # nothing to do
582         $l_uid ||= 1;
583         my ($mod, $shard) = @{$self->{shard_info} // []};
584         unless ($self->{quiet}) {
585                 my $m = $mod ? " [(UID % $mod) == $shard]" : '';
586                 warn "# $uri fetching UID $l_uid:$r_uid$m\n";
587         }
588         my $bs = $self->{cfg_opt}->{$sec}->{batch_size} // 1;
589         my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
590         my $key = $req;
591         $key =~ s/\.PEEK//;
592         my ($uids, $batch);
593         do {
594                 # I wish "UID FETCH $START:*" could work, but:
595                 # 1) servers do not need to return results in any order
596                 # 2) Mail::IMAPClient doesn't offer a streaming API
597                 if (defined $single_uid) {
598                         $uids = [ $single_uid ];
599                 } elsif (!($uids = $mic->search("UID $l_uid:*"))) {
600                         return if $!{EINTR} && $self->{quit};
601                         return "E: $uri UID SEARCH $l_uid:* error: $!";
602                 }
603                 return if scalar(@$uids) == 0;
604
605                 # RFC 3501 doesn't seem to indicate order of UID SEARCH
606                 # responses, so sort it ourselves.  Order matters so
607                 # IMAPTracker can store the newest UID.
608                 @$uids = sort { $a <=> $b } @$uids;
609
610                 # Did we actually get new messages?
611                 return if $uids->[0] < $l_uid;
612
613                 $l_uid = $uids->[-1] + 1; # for next search
614                 my $last_uid;
615                 my $n = $self->{max_batch};
616
617                 @$uids = grep { ($_ % $mod) == $shard } @$uids if $mod;
618                 while (scalar @$uids) {
619                         my @batch = splice(@$uids, 0, $bs);
620                         $batch = join(',', @batch);
621                         local $0 = "UID:$batch $mbx $sec";
622                         my $r = $mic->fetch_hash($batch, $req, 'FLAGS');
623                         unless ($r) { # network error?
624                                 last if $!{EINTR} && $self->{quit};
625                                 $err = "E: $uri UID FETCH $batch error: $!";
626                                 last;
627                         }
628                         for my $uid (@batch) {
629                                 # messages get deleted, so holes appear
630                                 my $per_uid = delete $r->{$uid} // next;
631                                 my $raw = delete($per_uid->{$key}) // next;
632                                 my $fl = $use_fl ? $per_uid->{FLAGS} : undef;
633                                 _imap_do_msg($self, $uri, $uid, \$raw, $fl);
634                                 $last_uid = $uid;
635                                 last if $self->{quit};
636                         }
637                         last if $self->{quit};
638                 }
639                 run_commit_cb($self);
640                 $itrk->update_last($r_uidval, $last_uid) if $itrk;
641         } until ($err || $self->{quit} || defined($single_uid));
642         $err;
643 }
644
645 # uses cached auth info prepared by mic_for
646 sub mic_get {
647         my ($self, $uri) = @_;
648         my $sec = uri_section($uri);
649         # see if caller saved result of imap_common_init
650         my $cached = $self->{mics_cached};
651         if ($cached) {
652                 my $mic = $cached->{$sec};
653                 return $mic if $mic && $mic->IsConnected;
654                 delete $cached->{$sec};
655         }
656         my $mic_arg = $self->{net_arg}->{$sec} or
657                         die "BUG: no Mail::IMAPClient->new arg for $sec";
658         if (defined(my $cb_name = $mic_arg->{Authcallback})) {
659                 if (ref($cb_name) ne 'CODE') {
660                         $mic_arg->{Authcallback} = $self->can($cb_name);
661                 }
662         }
663         my $mic = mic_new($self, $mic_arg, $sec, $uri);
664         $cached //= {}; # invalid placeholder if no cache enabled
665         $mic && $mic->IsConnected ? ($cached->{$sec} = $mic) : undef;
666 }
667
668 sub imap_each {
669         my ($self, $url, $eml_cb, @args) = @_;
670         my $uri = ref($url) ? $url : PublicInbox::URIimap->new($url);
671         my $sec = uri_section($uri);
672         local $0 = $uri->mailbox." $sec";
673         my $mic = mic_get($self, $uri);
674         my $err;
675         if ($mic) {
676                 local $self->{eml_each} = [ $eml_cb, @args ];
677                 $err = _imap_fetch_all($self, $mic, $uri);
678         } else {
679                 $err = "E: <$uri> not connected: $!";
680         }
681         die $err if $err && $self->{-can_die};
682         warn $err if $err;
683         $mic;
684 }
685
686 # may used cached auth info prepared by nn_for once
687 sub nn_get {
688         my ($self, $uri) = @_;
689         my $sec = uri_section($uri);
690         # see if caller saved result of nntp_common_init
691         my $cached = $self->{nn_cached} // {};
692         my $nn;
693         $nn = delete($cached->{$sec}) and return $nn;
694         my $nn_arg = $self->{net_arg}->{$sec} or
695                         die "BUG: no Net::NNTP->new arg for $sec";
696         my $nntp_cfg = $self->{cfg_opt}->{$sec};
697         $nn = nn_new($nn_arg, $nntp_cfg, $uri) or return;
698         if (my $postconn = $nntp_cfg->{-postconn}) {
699                 for my $m_arg (@$postconn) {
700                         my ($method, @args) = @$m_arg;
701                         $nn->$method(@args) and next;
702                         die "E: <$uri> $method failed\n";
703                         return;
704                 }
705         }
706         $nn;
707 }
708
709 sub _nntp_fetch_all ($$$) {
710         my ($self, $nn, $uri) = @_;
711         my ($group, $num_a, $num_b) = $uri->group;
712         my $sec = uri_section($uri);
713         my ($nr, $beg, $end) = $nn->group($group);
714         unless (defined($nr)) {
715                 my $msg = ndump($nn->message);
716                 return "E: GROUP $group <$sec> $msg";
717         }
718
719         # IMAPTracker is also used for tracking NNTP, UID == article number
720         # LIST.ACTIVE can get the equivalent of UIDVALIDITY, but that's
721         # expensive.  So we assume newsgroups don't change:
722         my ($itrk, $l_art) = itrk_last($self, $uri);
723
724         # allow users to specify articles to refetch
725         # cf. https://tools.ietf.org/id/draft-gilman-news-url-01.txt
726         # nntp://example.com/inbox.foo/$num_a-$num_b
727         $beg = $num_a if defined($num_a) && $num_a < $beg;
728         $end = $num_b if defined($num_b) && $num_b < $end;
729         if (defined $l_art) {
730                 return if $l_art >= $end; # nothing to do
731                 $beg = $l_art + 1;
732         }
733         my ($err, $art, $last_art, $kw); # kw stays undef, no keywords in NNTP
734         unless ($self->{quiet}) {
735                 warn "# $uri fetching ARTICLE $beg..$end\n";
736         }
737         my $n = $self->{max_batch};
738         for ($beg..$end) {
739                 last if $self->{quit};
740                 $art = $_;
741                 if (--$n < 0) {
742                         run_commit_cb($self);
743                         $itrk->update_last(0, $last_art) if $itrk;
744                         $n = $self->{max_batch};
745                 }
746                 my $raw = $nn->article($art);
747                 unless (defined($raw)) {
748                         my $msg = ndump($nn->message);
749                         if ($nn->code == 421) { # pseudo response from Net::Cmd
750                                 $err = "E: $msg";
751                                 last;
752                         } else { # probably just a deleted message (spam)
753                                 warn "W: $msg";
754                                 next;
755                         }
756                 }
757                 $raw = join('', @$raw);
758                 $raw =~ s/\r\n/\n/sg;
759                 my ($eml_cb, @args) = @{$self->{eml_each}};
760                 $eml_cb->($uri, $art, $kw, PublicInbox::Eml->new(\$raw), @args);
761                 $last_art = $art;
762         }
763         run_commit_cb($self);
764         $itrk->update_last(0, $last_art) if $itrk;
765         $err;
766 }
767
768 sub nntp_each {
769         my ($self, $url, $eml_cb, @args) = @_;
770         my $uri = ref($url) ? $url : PublicInbox::URInntps->new($url);
771         my $sec = uri_section($uri);
772         local $0 = $uri->group ." $sec";
773         my $nn = nn_get($self, $uri);
774         return if $self->{quit};
775         my $err;
776         if ($nn) {
777                 local $self->{eml_each} = [ $eml_cb, @args ];
778                 $err = _nntp_fetch_all($self, $nn, $uri);
779         } else {
780                 $err = "E: <$uri> not connected: $!";
781         }
782         die $err if $err && $self->{-can_die};
783         warn $err if $err;
784         $nn;
785 }
786
787 sub new { bless {}, shift };
788
789 # updates $uri with UIDVALIDITY
790 sub mic_for_folder {
791         my ($self, $uri) = @_;
792         my $mic = $self->mic_get($uri) or die "E: not connected: $@";
793         my $m = $self->isa('PublicInbox::NetWriter') ? 'select' : 'examine';
794         $mic->$m($uri->mailbox) or return;
795         my $uidval;
796         for ($mic->Results) {
797                 /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ or next;
798                 $uidval = $1;
799                 last;
800         }
801         $uidval //= $mic->uidvalidity($uri->mailbox) or
802                 die "E: failed to get uidvalidity from <$uri>: $@";
803         $uri->uidvalidity($uidval);
804         $mic;
805 }
806
807
808 1;