use PublicInbox::Import;
use PublicInbox::InboxWritable qw(eml_from_path);
use PublicInbox::V2Writable;
-use PublicInbox::ContentHash qw(content_hash git_sha);
+use PublicInbox::ContentHash qw(content_hash);
use PublicInbox::MID qw(mids);
use PublicInbox::LeiSearch;
use PublicInbox::MDA;
use PublicInbox::Spawn qw(spawn);
use PublicInbox::MdirReader;
use PublicInbox::LeiToMail;
-use List::Util qw(max);
use File::Temp ();
use POSIX ();
use IO::Handle (); # ->autoflush
$_[0]->{rotate_bytes} // ((1024 * 1024 * 1024) / $_[0]->packing_factor)
}
-sub git_pfx { "$_[0]->{priv_eidx}->{topdir}/local" };
-
-sub git_epoch_max {
- my ($self) = @_;
- if (opendir(my $dh, $self->git_pfx)) {
- max(map {
- substr($_, 0, -4) + 0; # drop ".git" suffix
- } grep(/\A[0-9]+\.git\z/, readdir($dh))) // 0;
- } else {
- $!{ENOENT} ? 0 : die("opendir ${\$self->git_pfx}: $!\n");
- }
-}
-
sub git_ident ($) {
my ($git) = @_;
my $rdr = {};
$im->done;
undef $im;
$self->checkpoint;
- $max = $self->git_epoch_max + 1;
+ $max = $self->{priv_eidx}->{mg}->git_epochs + 1;
}
my (undef, $tl) = eidx_init($self); # acquire lock
- my $pfx = $self->git_pfx;
- $max //= $self->git_epoch_max;
+ $max //= $self->{priv_eidx}->{mg}->git_epochs;
while (1) {
- my $latest = "$pfx/$max.git";
- my $old = -e $latest;
- PublicInbox::Import::init_bare($latest);
+ my $latest = $self->{priv_eidx}->{mg}->add_epoch($max);
my $git = PublicInbox::Git->new($latest);
- if (!$old) {
- $git->qx(qw(config core.sharedRepository 0600));
- $self->done; # unlock
- # re-acquire lock, update alternates for new epoch
- (undef, $tl) = eidx_init($self);
- }
+ $self->done; # unlock
+ # re-acquire lock, update alternates for new epoch
+ (undef, $tl) = eidx_init($self);
my $packed_bytes = $git->packed_bytes;
my $unpacked_bytes = $packed_bytes / $self->packing_factor;
if ($unpacked_bytes >= $self->rotate_bytes) {
syslog('warning', "unlink($src): $!");
}
# TODO: verify oidbin?
- lms_mv_src($self, "maildir:$mdir",
+ $self->{lms}->mv_src("maildir:$mdir",
$oidbin, \$orig, $bn);
return;
} elsif ($! == EEXIST) { # lost race with "lei export-kw"?
}
}
for (@try) { return if -e "$mdir/$_/$orig" };
- lms_clear_src($self, "maildir:$mdir", \$orig);
+ $self->{lms}->clear_src("maildir:$mdir", \$orig);
}
sub sto_export_kw ($$$) {
\@docids;
}
-sub _lms_rw ($) {
+sub _lms_rw ($) { # it is important to have eidx processes open before lms
my ($self) = @_;
my ($eidx, $tl) = eidx_init($self);
$self->{lms} //= do {
};
}
-sub lms_clear_src {
- my ($self, $folder, $id) = @_;
- _lms_rw($self)->clear_src($folder, $id);
-}
-
-sub lms_mv_src {
- my ($self, $folder, $oidbin, $id, $newbn) = @_;
- _lms_rw($self)->mv_src($folder, $oidbin, $id, $newbn);
-}
-
-sub lms_forget_folders {
- my ($self, @folders) = @_;
- my $lms = _lms_rw($self);
- for my $f (@folders) { $lms->forget_folder($f) }
-}
-
-sub lms_rename_folder {
- my ($self, $old, $new) = @_;
- _lms_rw($self)->rename_folder($old, $new);
-}
-
sub set_sync_info {
my ($self, $oidhex, $folder, $id) = @_;
_lms_rw($self)->set_src(pack('H*', $oidhex), $folder, $id);
$git->cat_async($oidhex, \&_remove_if_local, $self);
}
}
- $git->cat_async_wait;
+ $git->async_wait_all;
remove_docids($self, @docids);
\@docids;
}
$self->{priv_eidx}->done; # V2Writable::done
xchg_stderr($self);
die $err if $err;
-
- # notify clients ->done has been issued
- defined($sock_ref) and
- $self->{s2d_op_p}->pkt_do('sto_done_complete', $sock_ref);
}
sub ipc_atfork_child {
$self->SUPER::ipc_atfork_child;
}
+sub recv_and_run {
+ my ($self, @args) = @_;
+ local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
+ $self->SUPER::recv_and_run(@args);
+}
+
sub write_prepare {
my ($self, $lei) = @_;
$lei // die 'BUG: $lei not passed';
unless ($self->{-ipc_req}) {
- # s2d => store-to-daemon messages
- require PublicInbox::PktOp;
- my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair;
my $dir = $lei->store_path;
- $self->ipc_lock_init("$dir/ipc.lock");
substr($dir, -length('/lei/store'), 10, '');
pipe(my ($r, $w)) or die "pipe: $!";
$w->autoflush(1);
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
- $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, {
+ $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
lei => $lei,
-err_wr => $w,
- to_close => [ $r, $s2d_op_c->{sock} ],
- s2d_op_p => $s2d_op_p,
+ to_close => [ $r ],
});
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
- $s2d_op_c->{ops} = {
- sto_done_complete => [ $lei->can('sto_done_complete') ]
- };
}
$lei->{sto} = $self;
}
-# TODO: support SHA-256
-sub git_blob_id { # called via LEI->git_blob_id
- my ($self, $eml) = @_;
- $eml->header_set($_) for @PublicInbox::Import::UNWANTED_HEADERS;
- git_sha(1, $eml)->hexdigest;
-}
-
-# called by lei-daemon before lei->refresh_watches
-sub add_sync_folders {
- my ($self, @folders) = @_;
- my $lms = _lms_rw($self);
- for my $f (@folders) { $lms->fid_for($f, 1) }
-}
-
1;