}
# add all known Maildir folders as implicit watches
- my $sto = $lei->_lei_store;
- my $renames = 0;
- if (my $lms = $sto ? $sto->search->lms : undef) {
+ my $lms = $lei->lms;
+ if ($lms) {
+ $lms->lms_write_prepare;
for my $d ($lms->folders('maildir:')) {
substr($d, 0, length('maildir:')) = '';
- my $cd = canonpath_harder($d);
- my $f = "maildir:$cd";
# fixup old bugs while we're iterating:
- if ($d ne $cd) {
- $sto->ipc_do('lms_rename_folder',
- "maildir:$d", $f);
- ++$renames;
- }
+ my $cd = canonpath_harder($d);
+ my $f = "maildir:$cd";
+ $lms->rename_folder("maildir:$d", $f) if $d ne $cd;
next if $watches->{$f}; # may be set to pause
require PublicInbox::LeiWatch;
$watches->{$f} = PublicInbox::LeiWatch->new($f);
add_maildir_watch($cd, $cfg_f);
}
}
- $lei->sto_done_request if $renames;
if ($old) { # cull old non-existent entries
for my $url (keys %$old) {
next if exists $seen{$url};
git_sha(1, $eml);
}
-sub lms { # read-only LeiMailSync
- my ($lei) = @_;
- my $lse = $lei->{lse} // do {
- my $sto = $lei->{sto} // _lei_store($lei);
- $sto ? $sto->search : undef
- };
- $lse ? $lse->lms : undef;
+sub lms {
+ my ($lei, $rw) = @_;
+ my $sto = $lei->{sto} // _lei_store($lei) // return;
+ require PublicInbox::LeiMailSync;
+ my $f = "$sto->{priv_eidx}->{topdir}/mail_sync.sqlite3";
+ (-f $f || $rw) ? PublicInbox::LeiMailSync->new($f) : undef;
}
sub sto_done_request { # only call this from lei-daemon process (not workers)
if (!unlink($src) and $! != ENOENT) {
$lei->child_error(1, "E: unlink($src): $!");
}
- $lei->{sto}->ipc_do('lms_mv_src', "maildir:$mdir",
+ $self->{lms}->mv_src("maildir:$mdir",
$oidbin, $id, $bn);
return; # success anyways if link(2) worked
} elsif ($! == EEXIST) { # lost race with lei/store?
my $src = "$mdir/{".join(',', @try)."}/$$id";
$lei->child_error(1, "link($src -> $dst) ($oidhex): $e");
for (@try) { return if -e "$mdir/$_/$$id" }
- $lei->{sto}->ipc_do('lms_clear_src', "maildir:$mdir", $id);
+ $self->{lms}->clear_src("maildir:$mdir", $id);
}
sub export_kw_imap { # LeiMailSync->each_src callback
# overrides PublicInbox::LeiInput::input_path_url
sub input_path_url {
my ($self, $input, @args) = @_;
- my $lms = $self->{-lms_ro} //= $self->{lse}->lms;
+ $self->{lms}->lms_write_prepare;
if ($input =~ /\Amaildir:(.+)/i) {
my $mdir = $1;
require PublicInbox::LeiToMail; # kw2suffix
- $lms->each_src($input, \&export_kw_md, $self, $mdir);
+ $self->{lms}->each_src($input, \&export_kw_md, $self, $mdir);
} elsif ($input =~ m!\Aimaps?://!i) {
my $uri = PublicInbox::URIimap->new($input);
my $mic = $self->{nwr}->mic_for_folder($uri);
- $lms->each_src($$uri, \&export_kw_imap, $self, $mic);
+ $self->{lms}->each_src($$uri, \&export_kw_imap, $self, $mic);
$mic->expunge;
} else { die "BUG: $input not supported" }
- my $wait = $self->{lei}->{sto}->ipc_do('done');
}
sub lei_export_kw {
my $sto = $lei->_lei_store or return $lei->fail(<<EOM);
lei/store uninitialized, see lei-import(1)
EOM
- my $lse = $sto->search;
- my $lms = $lse->lms or return $lei->fail(<<EOM);
+ my $lms = $lei->lms or return $lei->fail(<<EOM);
lei mail_sync uninitialized, see lei-import(1)
EOM
- my $opt = $lei->{opt};
- if (defined(my $all = $opt->{all})) { # --all=<local|remote>
+ if (defined(my $all = $lei->{opt}->{all})) { # --all=<local|remote>
$lms->group2folders($lei, $all, \@folders) or return;
+ @folders = grep(/\A(?:maildir|imaps?):/i, @folders);
} else {
my $err = $lms->arg2folder($lei, \@folders);
$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
return $lei->fail($err->{fail}) if $err->{fail};
}
- my $self = bless { lse => $lse }, __PACKAGE__;
+ $lms->lms_pause;
+ my $self = bless { lse => $sto->search, lms => $lms }, __PACKAGE__;
$lei->{opt}->{'mail-sync'} = 1; # for prepare_inputs
$self->prepare_inputs($lei, \@folders) or return;
- my $j = $opt->{jobs} // scalar(@{$self->{inputs}}) || 1;
if (my @ro = grep(!/\A(?:maildir|imaps?):/i, @folders)) {
return $lei->fail("cannot export to read-only folders: @ro");
}
- my $m = $opt->{mode} // 'merge';
+ my $m = $lei->{opt}->{mode} // 'merge';
if ($m eq 'merge') { # default
$self->{-merge_kw} = 1;
} elsif ($m eq 'set') {
$self->{imap_mod_kw} = $net->can($self->{-merge_kw} ?
'imap_add_kw' : 'imap_set_kw');
}
- undef $lms; # for fork
my $ops = {};
- $sto->write_prepare($lei);
$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
- (my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
+ (my $op_c, $ops) = $lei->workers_start($self, 1, $ops);
$lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
net_merge_all_done($self) unless $lei->{auth};
sub _complete_export_kw {
my ($lei, @argv) = @_;
- my $sto = $lei->_lei_store or return;
- my $lms = $sto->search->lms or return;
+ my $lms = $lei->lms or return;
my $match_cb = $lei->complete_url_prepare(\@argv);
map { $match_cb->($_) } $lms->folders;
}
sub lei_forget_mail_sync {
my ($lei, @folders) = @_;
my $lms = $lei->lms or return;
- my $sto = $lei->_lei_store or return; # may disappear due to race
- $sto->write_prepare($lei);
+ $lms->lms_write_prepare;
my $err = $lms->arg2folder($lei, \@folders);
$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
return $lei->fail($err->{fail}) if $err->{fail};
- $sto->ipc_do('lms_forget_folders', @folders);
- $lei->sto_done_request;
+ $lms->forget_folders(@folders);
}
*_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw;
my $kw = PublicInbox::MdirReader::flags2kw($fl);
substr($folder, 0, 0) = 'maildir:'; # add prefix
my $lse = $self->{lse} //= $self->{lei}->{sto}->search;
- my $lms = $self->{-lms_ro} //= $lse->lms; # may be 0 or undef
+ my $lms = $self->{-lms_ro} //= $self->{lei}->lms; # may be 0 or undef
my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
my @docids = defined($oidbin) ? $lse->over->oidbin_exists($oidbin) : ();
my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
# $j = $net->net_concurrency($j); TODO
if ($lei->{opt}->{incremental} // 1) {
$net->{incremental} = 1;
- $net->{-lms_ro} = $sto->search->lms // 0;
+ $net->{-lms_ro} = $lei->lms // 0;
if ($self->{-import_kw} && $net->{-lms_ro} &&
!$lei->{opt}->{'new-only'} &&
$net->{imap_order}) {
my $match_cb = $lei->complete_url_prepare(\@argv);
my @m = map { $match_cb->($_) } $lei->url_folder_cache->keys;
my %f = map { $_ => 1 } @m;
- my $sto = $lei->_lei_store;
- if (my $lms = $sto ? $sto->search->lms : undef) {
+ if (my $lms = $lei->lms) {
@m = map { $match_cb->($_) } $lms->folders;
@f{@m} = @m;
}
# start watching Maildirs ASAP
if ($may_sync && $lei->{sto}) {
grep(!m!\Amaildir:/!i, @md) and die "BUG: @md (no pfx)";
- my $wait = $lei->{sto}->ipc_do('add_sync_folders', @md);
+ $lei->lms(1)->lms_write_prepare->add_folders(@md);
$lei->refresh_watches;
}
}
my $oidbin = pack('H*', $oidhex);
my @docids = $lse ? $lse->over->oidbin_exists($oidbin) : ();
$ent->{'lei/store'} = \@docids if @docids;
- my $lms = $lse->lms;
+ my $lms = $lei->lms;
if (my $loc = $lms ? $lms->locations_for($oidbin) : undef) {
$ent->{'mail-sync'} = $loc;
}
sub inspect_imap_uid ($$) {
my ($lei, $uid_uri) = @_;
my $ent = {};
- my $lse = $lei->{lse} or return $ent;
- my $lms = $lse->lms or return $ent;
+ my $lms = $lei->lms or return $ent;
my $oidhex = $lms->imap_oid($lei, $uid_uri);
if (ref(my $err = $oidhex)) { # art2folder error
$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
sub lei_ls_mail_sync {
my ($lei, $filter) = @_;
- my $sto = $lei->_lei_store or return;
- my $lms = $sto->search->lms or return;
+ my $lms = $lei->lms or return;
my $opt = $lei->{opt};
my $re = $opt->{globoff} ? undef : $lei->glob2re($filter // '*');
$re //= qr/\Q$filter\E/;
package PublicInbox::LeiMailSync;
use strict;
use v5.10.1;
+use parent qw(PublicInbox::Lock);
use DBI;
use PublicInbox::ContentHash qw(git_sha);
use Carp ();
sqlite_use_immediate_transaction => 1,
});
# no sqlite_unicode, here, all strings are binary
- create_tables($dbh) if $rw;
+ create_tables($self, $dbh) if $rw;
$dbh->do('PRAGMA journal_mode = WAL') if $creat;
$dbh->do('PRAGMA case_sensitive_like = ON');
$dbh;
sub new {
my ($cls, $f) = @_;
- bless { filename => $f, fmap => {} }, $cls;
+ bless {
+ filename => $f,
+ fmap => {},
+ lock_path => "$f.flock",
+ }, $cls;
}
-sub lms_write_prepare { ($_[0]->{dbh} //= dbh_new($_[0], 1)) };
+sub lms_write_prepare { ($_[0]->{dbh} //= dbh_new($_[0], 1)); $_[0] }
+
+sub lms_pause {
+ my ($self) = @_;
+ $self->{fmap} = {};
+ delete $self->{dbh};
+}
sub create_tables {
- my ($dbh) = @_;
+ my ($self, $dbh) = @_;
+ my $lk = $self->lock_for_scope;
$dbh->do(<<'');
CREATE TABLE IF NOT EXISTS folders (
$fid;
}
+sub add_folders {
+ my ($self, @folders) = @_;
+ my $lk = $self->lock_for_scope;
+ for my $f (@folders) { $self->{fmap}->{$f} //= fid_for($self, $f, 1) }
+}
+
sub set_src {
my ($self, $oidbin, $folder, $id) = @_;
+ my $lk = $self->lock_for_scope;
my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1);
my $sth;
if (ref($id)) { # scalar name
sub clear_src {
my ($self, $folder, $id) = @_;
+ my $lk = $self->lock_for_scope;
my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1);
my $sth;
if (ref($id)) { # scalar name
# Maildir-only
sub mv_src {
my ($self, $folder, $oidbin, $id, $newbn) = @_;
+ my $lk = $self->lock_for_scope;
my $fid = $self->{fmap}->{$folder} //= fid_for($self, $folder, 1);
my $sth = $self->{dbh}->prepare_cached(<<'');
UPDATE blob2name SET name = ? WHERE fid = ? AND oidbin = ? AND name = ?
$err;
}
-sub forget_folder {
- my ($self, $folder) = @_;
- my $fid = delete($self->{fmap}->{$folder}) //
- fid_for($self, $folder) // return;
- for my $t (qw(blob2name blob2num folders)) {
- $self->{dbh}->do("DELETE FROM $t WHERE fid = ?", undef, $fid);
+sub forget_folders {
+ my ($self, @folders) = @_;
+ my $lk = $self->lock_for_scope;
+ for my $folder (@folders) {
+ my $fid = delete($self->{fmap}->{$folder}) //
+ fid_for($self, $folder) // next;
+ for my $t (qw(blob2name blob2num folders)) {
+ $self->{dbh}->do("DELETE FROM $t WHERE fid = ?",
+ undef, $fid);
+ }
}
}
# only used for changing canonicalization errors
sub rename_folder {
my ($self, $old, $new) = @_;
+ my $lk = $self->lock_for_scope;
my $ofid = delete($self->{fmap}->{$old}) //
fid_for($self, $old) // return;
eval {
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# internal command for dealing with inotify, kqueue vnodes, etc
+# it is a semi-persistent worker
package PublicInbox::LeiNoteEvent;
use strict;
use v5.10.1;
sub flush_lei ($) {
my ($lei) = @_;
- if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
- $lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
- } elsif ($lei->{sto}) { # lms_clear_src calls only:
- $lei->sto_done_request;
- }
+ my $lne = delete $lei->{cfg}->{-lei_note_event};
+ $lne->wq_close(1, undef, $lei) if $lne; # runs _lei_wq_eof;
}
# we batch up writes and flush every 5s (matching Linux default
sub eml_event ($$$$) {
my ($self, $eml, $vmd, $state) = @_;
my $sto = $self->{lei}->{sto};
- my $lse = $self->{lse} //= $sto->search;
if ($state =~ /\Aimport-(?:rw|ro)\z/) {
$sto->ipc_do('set_eml', $eml, $vmd);
} elsif ($state =~ /\Aindex-(?:rw|ro)\z/) {
my $xoids = $self->{lei}->ale->xoids_for($eml);
$sto->ipc_do('index_eml_only', $eml, $vmd, $xoids);
} elsif ($state =~ /\Atag-(?:rw|ro)\z/) {
- my $c = $lse->kw_changed($eml, $vmd->{kw}, my $docids = []);
+ my $docids = [];
+ my $c = $self->{lse}->kw_changed($eml, $vmd->{kw}, $docids);
if (scalar @$docids) { # already in lei/store
$sto->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
} elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) {
my $cfg = $lei->_lei_cfg or return; # gone (race)
my $sto = $lei->_lei_store or return; # gone
return flush_lei($lei) if $folder eq 'done'; # special case
- my $lms = $sto->search->lms or return;
+ my $lms = $lei->lms or return;
+ $lms->lms_write_prepare if $new_cur eq ''; # for ->clear_src below
my $err = $lms->arg2folder($lei, [ $folder ]);
return if $err->{fail};
- undef $lms;
my $state = $cfg->get_1("watch.$folder", 'state') // 'tag-rw';
return if $state eq 'pause';
+ return $lms->clear_src($folder, \$bn) if $new_cur eq '';
+ $lms->lms_pause;
$lei->ale; # prepare
$sto->write_prepare($lei);
- if ($new_cur eq '') {
- $sto->ipc_do('lms_clear_src', $folder, \$bn);
- return note_event_arm_done($lei);
- }
require PublicInbox::MdirReader;
my $self = $cfg->{-lei_note_event} //= do {
- my $wq = bless {}, __PACKAGE__;
+ my $wq = bless { lms => $lms }, __PACKAGE__;
# MUAs such as mutt can trigger massive rename() storms so
# use all CPU power available:
my $jobs = $wq->detect_nproc // 1;
sub ipc_atfork_child {
my ($self) = @_;
$self->{lei}->_lei_atfork_child(1); # persistent, for a while
+ $self->{lms}->lms_write_prepare;
+ $self->{lse} = $self->{lei}->{sto}->search;
$self->SUPER::ipc_atfork_child;
}
use PublicInbox::InboxWritable qw(eml_from_path);
use PublicInbox::Import;
-sub folder_missing {
+sub folder_missing { # may be called by LeiInput
my ($self, $folder) = @_;
- $self->{lei}->{sto}->ipc_do('lms_forget_folders', $folder);
+ $self->{lms}->forget_folders($folder);
}
sub prune_mdir { # lms->each_src callback
my @try = $$id =~ /:2,[a-zA-Z]*\z/ ? qw(cur new) : qw(new cur);
for (@try) { return if -f "$mdir/$_/$$id" }
# both tries failed
- $self->{lei}->{sto}->ipc_do('lms_clear_src', "maildir:$mdir", $id);
+ $self->{lms}->clear_src("maildir:$mdir", $id);
}
sub prune_imap { # lms->each_src callback
my ($oidbin, $uid, $self, $uids, $url) = @_;
return if exists $uids->{$uid};
- $self->{lei}->{sto}->ipc_do('lms_clear_src', $url, $uid);
+ $self->{lms}->clear_src($url, $uid);
}
# detects missed file moves
my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
die "BUG: $f was not from a Maildir?";
substr($folder, 0, 0) = 'maildir:'; # add prefix
- my $lms = $self->{-lms_ro} //= $self->{lei}->lms;
- return if defined($lms->name_oidbin($folder, $bn));
+ return if defined($self->{lms}->name_oidbin($folder, $bn));
my $eml = eml_from_path($f) // return;
my $oidbin = $self->{lei}->git_oid($eml)->digest;
- $self->{lei}->{sto}->ipc_do('lms_set_src', $oidbin, $folder, \$bn);
+ $self->{lms}->set_src($oidbin, $folder, \$bn);
}
sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url
my ($self, $input, @args) = @_;
- my $lms = $self->{-lms_ro} //= $self->{lei}->lms;
if ($input =~ /\Amaildir:(.+)/i) {
- $lms->each_src($input, \&prune_mdir, $self, my $mdir = $1);
+ $self->{lms}->each_src($input, \&prune_mdir, $self, $1);
$self->{lse} //= $self->{lei}->{sto}->search;
# call pmdir_cb (via maildir_each_file -> each_mdir_fn)
PublicInbox::LeiInput::input_path_url($self, $input);
if (my $mic = $self->{lei}->{net}->mic_for_folder($uri)) {
my $uids = $mic->search('UID 1:*');
$uids = +{ map { $_ => undef } @$uids };
- $lms->each_src($$uri, \&prune_imap, $self, $uids, $$uri)
+ $self->{lms}->each_src($$uri, \&prune_imap, $self,
+ $uids, $$uri)
} else {
$self->folder_missing($$uri);
}
$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
return $lei->fail($err->{fail}) if $err->{fail};
}
- undef $lms; # must be done before fork
+ $lms->lms_pause; # must be done before fork
$sto->write_prepare($lei);
- my $self = bless { missing_ok => 1 }, __PACKAGE__;
+ my $self = bless { missing_ok => 1, lms => $lms }, __PACKAGE__;
$lei->{opt}->{'mail-sync'} = 1; # for prepare_inputs
$self->prepare_inputs($lei, \@folders) or return;
my $ops = {};
$lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth}
}
+sub ipc_atfork_child { # needed for PublicInbox::LeiPmdir
+ my ($self) = @_;
+ PublicInbox::LeiInput::input_only_atfork_child($self);
+ $self->{lms}->lms_write_prepare;
+ undef;
+}
+
no warnings 'once';
*_complete_refresh_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw;
-*ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child;
*net_merge_all_done = \&PublicInbox::LeiInput::input_only_net_merge_all_done;
1;
syslog('warning', "unlink($src): $!");
}
# TODO: verify oidbin?
- lms_mv_src($self, "maildir:$mdir",
+ $self->{lms}->mv_src("maildir:$mdir",
$oidbin, \$orig, $bn);
return;
} elsif ($! == EEXIST) { # lost race with "lei export-kw"?
}
}
for (@try) { return if -e "$mdir/$_/$orig" };
- lms_clear_src($self, "maildir:$mdir", \$orig);
+ $self->{lms}->clear_src("maildir:$mdir", \$orig);
}
sub sto_export_kw ($$$) {
\@docids;
}
-sub _lms_rw ($) {
+sub _lms_rw ($) { # it is important to have eidx processes open before lms
my ($self) = @_;
my ($eidx, $tl) = eidx_init($self);
$self->{lms} //= do {
};
}
-sub lms_clear_src {
- my ($self, $folder, $id) = @_;
- _lms_rw($self)->clear_src($folder, $id);
-}
-
-sub lms_mv_src {
- my ($self, $folder, $oidbin, $id, $newbn) = @_;
- _lms_rw($self)->mv_src($folder, $oidbin, $id, $newbn);
-}
-
-sub lms_forget_folders {
- my ($self, @folders) = @_;
- my $lms = _lms_rw($self);
- for my $f (@folders) { $lms->forget_folder($f) }
-}
-
-sub lms_rename_folder {
- my ($self, $old, $new) = @_;
- _lms_rw($self)->rename_folder($old, $new);
-}
-
sub set_sync_info {
my ($self, $oidhex, $folder, $id) = @_;
_lms_rw($self)->set_src(pack('H*', $oidhex), $folder, $id);
}
-sub lms_set_src {
- my ($self, $oidbin, $folder, $id) = @_;
- _lms_rw($self)->set_src($oidbin, $folder, $id);
-}
-
sub _remove_if_local { # git->cat_async arg
my ($bref, $oidhex, $type, $size, $self) = @_;
$self->{im}->remove($bref) if $bref;
$lei->{sto} = $self;
}
-# called by lei-daemon before lei->refresh_watches
-sub add_sync_folders {
- my ($self, @folders) = @_;
- my $lms = _lms_rw($self);
- for my $f (@folders) { $lms->fid_for($f, 1) }
-}
-
1;
sub clobber_dst_prepare ($;$) {
my ($lei, $f) = @_;
- my $wait = (defined($f) && $lei->{sto}) ?
- $lei->{sto}->ipc_do('lms_forget_folders', $f) : undef;
+ if (my $lms = defined($f) ? $lei->lms : undef) {
+ $lms->lms_write_prepare;
+ $lms->forget_folders($f);
+ }
my $dedupe = $lei->{dedupe} or return;
$dedupe->reset_dedupe if $dedupe->can('reset_dedupe');
}