lib/PublicInbox/LeiForgetSearch.pm
lib/PublicInbox/LeiHelp.pm
lib/PublicInbox/LeiImport.pm
+lib/PublicInbox/LeiImportKw.pm
lib/PublicInbox/LeiIndex.pm
lib/PublicInbox/LeiInit.pm
lib/PublicInbox/LeiInput.pm
'leistore.dir' => 'top-level storage location',
);
-my @WQ_KEYS = qw(lxs l2m wq1); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
sub _drop_wq {
my ($self) = @_;
sub input_net_cb { # imap_each / nntp_each
my ($uri, $uid, $kw, $eml, $self) = @_;
- my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
- $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
if (defined $eml) {
+ my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+ $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
$self->input_eml_cb($eml, $vmd);
- } elsif ($vmd) { # old message, kw only
- my $oid = $self->{-lms_ro}->imap_oid2($uri, $uid) // return;
- my @docids = $self->{lse}->over->blob_exists($oid) or return;
- $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
- my $lei = $self->{lei};
- $lei->qerr("# $oid => @$kw\n") if $lei->{opt}->{verbose};
- $self->{lei}->{sto}->ipc_do('set_eml_vmd', undef,
- $vmd, \@docids);
+ } elsif (my $ikw = $self->{lei}->{ikw}) { # old message, kw only
+ # we send $uri as a bare SCALAR and not a URIimap ref to
+ # reduce socket traffic:
+ $ikw->wq_io_do('ck_update_kw', [], $$uri, $uid, $kw);
}
}
$lei->ale; # initialize for workers to read
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+ my $ikw;
if (my $net = $lei->{net}) {
# $j = $net->net_concurrency($j); TODO
if ($lei->{opt}->{incremental} // 1) {
$net->{incremental} = 1;
$net->{-lms_ro} = $sto->search->lms // 0;
- if ($self->{-import_kw}) {
+ if ($self->{-import_kw} && $net->{-lms_ro} &&
+ $net->{imap_order}) {
+ require PublicInbox::LeiImportKw;
+ $ikw = PublicInbox::LeiImportKw->new($lei);
$net->{each_old} = 1;
- $self->{-lms_ro} = $net->{-lms_ro};
- $self->{lse} = $sto->search;
}
}
} else {
(my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
$lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
+ $ikw->wq_close(1) if $ikw;
net_merge_all_done($self) unless $lei->{auth};
$op_c->op_wait_event($ops);
}
--- /dev/null
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with LeiImport IMAP flags on already-imported messages
+# WQ key: {ikw}
+package PublicInbox::LeiImportKw;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+sub new {
+ my ($cls, $lei) = @_;
+ my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
+ my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
+ $op_c->{ops} = $ops; # for PktOp->event_step
+ $lei->{ikw} = $self;
+}
+
+sub ipc_atfork_child {
+ my ($self) = @_;
+ my $lei = $self->{lei};
+ $lei->_lei_atfork_child;
+ my $net = delete $lei->{net} // die 'BUG: no lei->{net}';
+ $self->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+ $self->{verbose} = $lei->{opt}->{verbose};
+ $self->{lse} = $self->{sto}->search;
+ $self->{over} = $self->{lse}->over;
+ $self->{-lms_ro} = $net->{-lms_ro} || die 'BUG: net->{-lms_ro} FALSE';
+ $self->SUPER::ipc_atfork_child;
+}
+
+sub ck_update_kw { # via wq_io_do
+ my ($self, $url, $uid, $kw) = @_;
+ my $oidbin = $self->{-lms_ro}->imap_oidbin($url, $uid) // return;
+ my @docids = $self->{over}->oidbin_exists($oidbin) or return;
+ $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+ $self->{verbose} and
+ $self->{lei}->qerr('# '.unpack('H*', $oidbin)." => @$kw\n");
+ $self->{sto}->ipc_do('set_eml_vmd', undef, { kw => $kw }, \@docids);
+}
+
+sub ikw_done_wait {
+ my ($arg, $pid) = @_;
+ my ($self, $lei) = @$arg;
+ my $wait = $lei->{sto}->ipc_do('done');
+ $lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+ my ($lei) = @_;
+ my $ikw = delete $lei->{ikw} or return $lei->fail;
+ $ikw->wq_wait_old(\&ikw_done_wait, $lei);
+}
+
+1;
UNIQUE (oidbin, fid, uid)
)
+ # speeds up LeiImport->ck_update_kw (for "lei import") by 5-6x:
+ $dbh->do(<<'');
+CREATE INDEX IF NOT EXISTS idx_fid_uid ON blob2num(fid,uid)
+
$dbh->do(<<'');
CREATE TABLE IF NOT EXISTS blob2name (
oidbin VARBINARY NOT NULL,
$dbh->do('DELETE FROM folders WHERE fid = ?', undef, $fid);
}
-sub imap_oid2 ($$$) {
- my ($self, $uri, $uid) = @_; # $uri MUST have UIDVALIDITY
- my $fid = $self->{fmap}->{"$uri"} //= fid_for($self, "$uri") // return;
+sub imap_oidbin ($$$) {
+ my ($self, $url, $uid) = @_; # $url MUST have UIDVALIDITY
+ my $fid = $self->{fmap}->{$url} //= fid_for($self, $url) // return;
my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
SELECT oidbin FROM blob2num WHERE fid = ? AND uid = ?
EOM
$sth->execute($fid, $uid);
- my ($oidbin) = $sth->fetchrow_array;
- $oidbin ? unpack('H*', $oidbin) : undef;
+ $sth->fetchrow_array;
}
sub imap_oid {
}
$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
}
- imap_oid2($self, $folders->[0], $uid_uri->uid);
+ my $oidbin = imap_oidbin($self, $folders->[0], $uid_uri->uid);
+ $oidbin ? unpack('H*', $oidbin) : undef;
}
-
# FIXME: something with "lei <up|q>" is causing uncommitted transaction
# warnings, not sure what...
sub DESTROY {
while (my ($uid, $per_uid) = each %$r) {
my $kw = flags2kw($self, $uri, $uid, $per_uid->{FLAGS})
// next;
+ # LeiImport->input_net_cb
$eml_cb->($uri, $uid, $kw, undef, @args);
}
}
}
}
-sub blob_exists {
- my ($self, $oidhex) = @_;
+sub oidbin_exists {
+ my ($self, $oidbin) = @_;
if (wantarray) {
my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC
- $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+ $sth->bind_param(1, $oidbin, SQL_BLOB);
$sth->execute;
my $tmp = $sth->fetchall_arrayref;
map { $_->[0] } @$tmp;
my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
- $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+ $sth->bind_param(1, $oidbin, SQL_BLOB);
$sth->execute;
$sth->fetchrow_array;
}
}
+sub blob_exists { oidbin_exists($_[0], pack('H*', $_[1])) }
+
1;