IMAPTracker has a UNIQUE constraint on the `url' column,
which may cause compatibility and/or rollback problems
in attempting to deal with UIDVALIDITY changes.
Having multiple sources of truth leads to confusion and bugs,
so relying on LeiMailSync exclusively ought to simplify things.
Furthermore, since LeiMailSync is only written to by LeiStore,
it is safer in that it won't mark a UID or article as imported
until git-fast-import has seen it, and the SQLite commit always
happens after "done\n" is sent to fast-import.
This mostly reverts recent commits to IMAPTracker to support
lei, those are:
1) commit
7632d8f7590daf70c65d4270e750c36552fa9389
("net_reader: restart on first UID when UIDVALIDITY changes")
2) commit
311a5d37ad275cd75b1e64d87827c4d13fe4bfab
("imap_tracker: prepare for use with lei").
This means public-inbox-watch will not change between 1.6 and
1.7: -watch stops synching a folder when UIDVALIDITY changes.
~/.local/share/lei/store
- ipc.lock # lock file for internal lei IPC
- local/$EPOCH.git # normal bare git repositories
~/.local/share/lei/store
- ipc.lock # lock file for internal lei IPC
- local/$EPOCH.git # normal bare git repositories
- - net_last.sqlite3 # import state for IMAP & NNTP
+ - mail_sync.sqlite3 # sync state IMAP, Maildir, NNTP
Additionally, the following share the same roles they do in extindex:
Additionally, the following share the same roles they do in extindex:
-sub get_last ($;$) {
- my ($self, $validity) = @_;
- my $sth;
- if (defined $validity) {
- $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
-SELECT uid_validity, uid FROM imap_last WHERE url = ? AND uid_validity = ?
-
- $sth->execute($self->{url}, $validity);
- } else {
- $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
+sub get_last ($) {
+ my ($self) = @_;
+ my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
SELECT uid_validity, uid FROM imap_last WHERE url = ?
SELECT uid_validity, uid FROM imap_last WHERE url = ?
- $sth->execute($self->{url});
- }
+ $sth->execute($self->{url});
- my ($class, $url, $dbname) = @_;
+ my ($class, $url) = @_;
- unless (defined($dbname)) {
- # original name for compatibility with old setups:
- $dbname = PublicInbox::Config->config_dir() . '/imap.sqlite3';
+ # original name for compatibility with old setups:
+ my $dbname = PublicInbox::Config->config_dir() . '/imap.sqlite3';
- # use the new XDG-compliant name for new setups:
- if (!-f $dbname) {
- $dbname = ($ENV{XDG_DATA_HOME} //
- (($ENV{HOME} // '/nonexistent').
- '/.local/share')) .
- '/public-inbox/imap.sqlite3';
- }
+ # use the new XDG-compliant name for new setups:
+ if (!-f $dbname) {
+ $dbname = ($ENV{XDG_DATA_HOME} //
+ (($ENV{HOME} // '/nonexistent').'/.local/share')) .
+ '/public-inbox/imap.sqlite3';
}
if (!-f $dbname) {
require File::Path;
}
if (!-f $dbname) {
require File::Path;
'import' => [ 'LOCATION...|--stdin',
'one-time import/update from URL or filesystem',
qw(stdin| offset=i recursive|r exclude=s include|I=s
'import' => [ 'LOCATION...|--stdin',
'one-time import/update from URL or filesystem',
qw(stdin| offset=i recursive|r exclude=s include|I=s
- lock=s@ in-format|F=s kw! verbose|v+ incremental! sync!), @c_opt ],
+ lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!), @c_opt ],
'convert' => [ 'LOCATION...|--stdin',
'one-time conversion from URL or filesystem to another format',
qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s
'convert' => [ 'LOCATION...|--stdin',
'one-time conversion from URL or filesystem to another format',
qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s
input_eml_cb($self, $eml, $vmd);
}
input_eml_cb($self, $eml, $vmd);
}
-sub input_imap_cb { # imap_each
+sub input_net_cb { # imap_each / nntp_each
my ($url, $uid, $kw, $eml, $self) = @_;
my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
$vmd->{sync_info} = [ $url, $uid ] if $self->{-mail_sync};
input_eml_cb($self, $eml, $vmd);
}
my ($url, $uid, $kw, $eml, $self) = @_;
my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
$vmd->{sync_info} = [ $url, $uid ] if $self->{-mail_sync};
input_eml_cb($self, $eml, $vmd);
}
-sub input_nntp_cb { # nntp_each
- my ($url, $num, $kw, $eml, $self) = @_;
- input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef);
-}
-
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
$self->wq_io_do('process_inputs');
sub net_merge_complete { # callback used by LeiAuth
my ($self) = @_;
$self->wq_io_do('process_inputs');
return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
$self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
$self->prepare_inputs($lei, \@inputs) or return;
return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
$self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
$self->prepare_inputs($lei, \@inputs) or return;
- $self->{-mail_sync} = $lei->{opt}->{sync} // 1;
+ $self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
$lei->ale; # initialize for workers to read
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
$lei->ale; # initialize for workers to read
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
# $j = $net->net_concurrency($j); TODO
if ($lei->{opt}->{incremental} // 1) {
$net->{incremental} = 1;
# $j = $net->net_concurrency($j); TODO
if ($lei->{opt}->{incremental} // 1) {
$net->{incremental} = 1;
- $net->{itrk_fn} = $lei->store_path .
- '/net_last.sqlite3';
+ $net->{-lms_ro} = $lei->_lei_store->search->lms // 0;
}
} else {
my $nproc = $self->detect_nproc;
}
} else {
my $nproc = $self->detect_nproc;
my $ifmt = lc($lei->{opt}->{'in-format'} // '');
# TODO auto-detect?
if ($input =~ m!\Aimaps?://!i) {
my $ifmt = lc($lei->{opt}->{'in-format'} // '');
# TODO auto-detect?
if ($input =~ m!\Aimaps?://!i) {
- $lei->{net}->imap_each($input, $self->can('input_imap_cb') //
- $self->can('input_net_cb'),
- $self, @args);
+ $lei->{net}->imap_each($input, $self->can('input_net_cb'),
+ $self, @args);
return;
} elsif ($input =~ m!\A(?:nntps?|s?news)://!i) {
return;
} elsif ($input =~ m!\A(?:nntps?|s?news)://!i) {
- $lei->{net}->nntp_each($input, $self->can('input_nntp_cb') //
- $self->can('input_net_cb'),
- $self, @args);
+ $lei->{net}->nntp_each($input, $self->can('input_net_cb'),
+ $self, @args);
return;
} elsif ($input =~ m!\Ahttps?://!i) {
handle_http_input($self, $input, @args);
return;
} elsif ($input =~ m!\Ahttps?://!i) {
handle_http_input($self, $input, @args);
- my ($self, $folder, $cb, @args) = @_;
+ my ($self, $folder) = @_;
my $dbh = $self->{dbh} //= dbh_new($self);
my $fid;
my $ret = {};
my $dbh = $self->{dbh} //= dbh_new($self);
my $fid;
my $ret = {};
$self->{quiet} = 1 if $lei && $lei->{opt}->{quiet};
eval { require PublicInbox::IMAPClient } or
die "Mail::IMAPClient is required for IMAP:\n$@\n";
$self->{quiet} = 1 if $lei && $lei->{opt}->{quiet};
eval { require PublicInbox::IMAPClient } or
die "Mail::IMAPClient is required for IMAP:\n$@\n";
- eval { require PublicInbox::IMAPTracker } or
+ ($lei || eval { require PublicInbox::IMAPTracker }) or
die "DBD::SQLite is required for IMAP\n:$@\n";
require PublicInbox::URIimap;
my $cfg = $self->{pi_cfg} // $lei->_lei_cfg;
die "DBD::SQLite is required for IMAP\n:$@\n";
require PublicInbox::URIimap;
my $cfg = $self->{pi_cfg} // $lei->_lei_cfg;
$self->{quiet} = 1 if $lei && $lei->{opt}->{quiet};
eval { require Net::NNTP } or
die "Net::NNTP is required for NNTP:\n$@\n";
$self->{quiet} = 1 if $lei && $lei->{opt}->{quiet};
eval { require Net::NNTP } or
die "Net::NNTP is required for NNTP:\n$@\n";
- eval { require PublicInbox::IMAPTracker } or
+ ($lei || eval { require PublicInbox::IMAPTracker }) or
die "DBD::SQLite is required for NNTP\n:$@\n";
my $cfg = $self->{pi_cfg} // $lei->_lei_cfg;
my $nn_args = {}; # scheme://authority => Net::NNTP->new arg
die "DBD::SQLite is required for NNTP\n:$@\n";
my $cfg = $self->{pi_cfg} // $lei->_lei_cfg;
my $nn_args = {}; # scheme://authority => Net::NNTP->new arg
-sub _itrk ($$) {
- my ($self, $uri) = @_;
- return unless $self->{incremental};
- # itrk_fn is set by lei
- PublicInbox::IMAPTracker->new($$uri, $self->{itrk_fn});
+sub _itrk_last ($$;$) {
+ my ($self, $uri, $r_uidval) = @_;
+ return (undef, undef, $r_uidval) unless $self->{incremental};
+ my ($itrk, $l_uid, $l_uidval);
+ if (defined(my $lms = $self->{-lms_ro})) { # LeiMailSync or 0
+ $uri->uidvalidity($r_uidval) if defined $r_uidval;
+ my $x;
+ $l_uid = ($lms && ($x = $lms->location_stats($$uri))) ?
+ $x->{'uid.max'} : undef;
+ # itrk remains undef, lei/store worker writes to
+ # mail_sync.sqlite3
+ } else {
+ $itrk = PublicInbox::IMAPTracker->new($$uri);
+ ($l_uidval, $l_uid) = $itrk->get_last($$uri);
+ }
+ ($itrk, $l_uid, $l_uidval //= $r_uidval);
}
sub _imap_fetch_all ($$$) {
}
sub _imap_fetch_all ($$$) {
- my ($self, $mic, $uri) = @_;
- my $sec = uri_section($uri);
- my $mbx = $uri->mailbox;
+ my ($self, $mic, $orig_uri) = @_;
+ my $sec = uri_section($orig_uri);
+ my $mbx = $orig_uri->mailbox;
$mic->Clear(1); # trim results history
$mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!";
my ($r_uidval, $r_uidnext);
$mic->Clear(1); # trim results history
$mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!";
my ($r_uidval, $r_uidnext);
last if $r_uidval && $r_uidnext;
}
$r_uidval //= $mic->uidvalidity($mbx) //
last if $r_uidval && $r_uidnext;
}
$r_uidval //= $mic->uidvalidity($mbx) //
- return "E: $uri cannot get UIDVALIDITY";
+ return "E: $orig_uri cannot get UIDVALIDITY";
$r_uidnext //= $mic->uidnext($mbx) //
$r_uidnext //= $mic->uidnext($mbx) //
- return "E: $uri cannot get UIDNEXT";
- my $url = ref($uri)->new($$uri);
- $url->uidvalidity($r_uidval);
- $url = $$url;
- my $itrk = _itrk($self, $uri);
- my $l_uid;
- $l_uid = $itrk->get_last($r_uidval) if $itrk;
+ return "E: $orig_uri cannot get UIDNEXT";
+ my $uri = $orig_uri->clone;
+ my ($itrk, $l_uid, $l_uidval) = _itrk_last($self, $uri, $r_uidval);
+ return <<EOF if $l_uidval != $r_uidval;
+E: $uri UIDVALIDITY mismatch
+E: local=$l_uidval != remote=$r_uidval
+EOF
+ $uri->uidvalidity($r_uidval);
$l_uid //= 0;
my $r_uid = $r_uidnext - 1;
$l_uid //= 0;
my $r_uid = $r_uidnext - 1;
- if ($l_uid > $r_uid) {
- return "E: $uri local UID exceeds remote ($l_uid > $r_uid)\n";
- }
+ return <<EOF if $l_uid > $r_uid;
+E: $uri local UID exceeds remote ($l_uid > $r_uid)
+E: $uri strangely, UIDVALIDLITY matches ($l_uidval)
+EOF
return if $l_uid >= $r_uid; # nothing to do
$l_uid ||= 1;
my ($mod, $shard) = @{$self->{shard_info} // []};
return if $l_uid >= $r_uid; # nothing to do
$l_uid ||= 1;
my ($mod, $shard) = @{$self->{shard_info} // []};
# messages get deleted, so holes appear
my $per_uid = delete $r->{$uid} // next;
my $raw = delete($per_uid->{$key}) // next;
# messages get deleted, so holes appear
my $per_uid = delete $r->{$uid} // next;
my $raw = delete($per_uid->{$key}) // next;
- _imap_do_msg($self, $url, $uid, \$raw,
+ _imap_do_msg($self, $$uri, $uid, \$raw,
$per_uid->{FLAGS});
$last_uid = $uid;
last if $self->{quit};
$per_uid->{FLAGS});
$last_uid = $uid;
last if $self->{quit};
# IMAPTracker is also used for tracking NNTP, UID == article number
# LIST.ACTIVE can get the equivalent of UIDVALIDITY, but that's
# expensive. So we assume newsgroups don't change:
# IMAPTracker is also used for tracking NNTP, UID == article number
# LIST.ACTIVE can get the equivalent of UIDVALIDITY, but that's
# expensive. So we assume newsgroups don't change:
- my $itrk = _itrk($self, $uri);
- my (undef, $l_art) = $itrk ? $itrk->get_last : ();
+ my ($itrk, $l_art) = _itrk_last($self, $uri);
# allow users to specify articles to refetch
# cf. https://tools.ietf.org/id/draft-gilman-news-url-01.txt
# allow users to specify articles to refetch
# cf. https://tools.ietf.org/id/draft-gilman-news-url-01.txt
sub as_string { ${$_[0]} }
sub as_string { ${$_[0]} }
+sub clone { ref($_[0])->new(as_string($_[0])) }
+
is_deeply(\%r, { 'HASH' => scalar(@$out) }, 'all hashes');
lei_ok([qw(tag +kw:seen), $url], undef, undef);
is_deeply(\%r, { 'HASH' => scalar(@$out) }, 'all hashes');
lei_ok([qw(tag +kw:seen), $url], undef, undef);
- my $f = "$ENV{HOME}/.local/share/lei/store/net_last.sqlite3";
- ok(-s $f, 'net tracked for redundant imports');
+ my $f = "$ENV{HOME}/.local/share/lei/store/mail_sync.sqlite3";
+ ok(-s $f, 'mail_sync tracked for redundant imports');
lei_ok('inspect', "blob:$out->[5]->{blob}");
my $x = json_utf8->decode($lei_out);
is(ref($x->{'lei/store'}), 'ARRAY', 'lei/store in inspect');
lei_ok('inspect', "blob:$out->[5]->{blob}");
my $x = json_utf8->decode($lei_out);
is(ref($x->{'lei/store'}), 'ARRAY', 'lei/store in inspect');
for (@$out) { $r{ref($_)}++ }
is_deeply(\%r, { 'HASH' => scalar(@$out) }, 'all hashes');
for (@$out) { $r{ref($_)}++ }
is_deeply(\%r, { 'HASH' => scalar(@$out) }, 'all hashes');
- my $f = "$ENV{HOME}/.local/share/lei/store/net_last.sqlite3";
- ok(-s $f, 'net tracked for redundant imports');
+ my $f = "$ENV{HOME}/.local/share/lei/store/mail_sync.sqlite3";
+ ok(-s $f, 'mail_sync exists tracked for redundant imports');