]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
pkt_op: make pkt_do an OO method
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index ad6b94398a83f1cc6b6f62e7b9ad5706325f9cc6..b9405c0c68047165f5bfcb80cded011ce274ae81 100644 (file)
@@ -9,7 +9,6 @@ use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::ProcessPipe;
 use PublicInbox::Spawn qw(spawn);
-use PublicInbox::PktOp qw(pkt_do);
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
@@ -289,7 +288,7 @@ sub _maildir_write_cb ($$) {
        my $dst = $lei->{ovv}->{dst};
        my $lse = $lei->{lse}; # may be undef
        my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef;
-       my $out = $sto ? 'maildir:'.$lei->rel2abs($dst) : undef;
+       my $out = $sto ? 'maildir:'.$lei->abs_path($dst) : undef;
 
        # Favor cur/ and only write to new/ when augmenting.  This
        # saves MUAs from having to do a mass rename when the initial
@@ -353,16 +352,28 @@ sub _text_write_cb ($$) {
        sub { # for git_to_mail
                my ($bref, $smsg, $eml) = @_;
                $lse->xsmsg_vmd($smsg) if $lse;
-               $eml //= PublicInbox::Eml->new($bref); # copy bref
+               $eml //= PublicInbox::Eml->new($bref);
                return if $dedupe && $dedupe->is_dup($eml, $smsg);
                my $lk = $ovv->lock_for_scope;
                $lei->out(${$lvt->eml_to_text($smsg, $eml)}, "\n");
        }
 }
 
+sub _v2_write_cb ($$) {
+       my ($self, $lei) = @_;
+       my $dedupe = $lei->{dedupe};
+       $dedupe->prepare_dedupe if $dedupe;
+       sub { # for git_to_mail
+               my ($bref, $smsg, $eml) = @_;
+               $eml //= PublicInbox::Eml->new($bref);
+               return if $dedupe && $dedupe->is_dup($eml, $smsg);
+               $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add
+       }
+}
+
 sub write_cb { # returns a callback for git_to_mail
        my ($self, $lei) = @_;
-       # _mbox_write_cb, _maildir_write_cb or _imap_write_cb
+       # _mbox_write_cb, _maildir_write_cb, _imap_write_cb, _v2_write_cb
        my $m = "_$self->{base_type}_write_cb";
        $self->$m($lei);
 }
@@ -372,6 +383,7 @@ sub new {
        my $fmt = $lei->{ovv}->{fmt};
        my $dst = $lei->{ovv}->{dst};
        my $self = bless {}, $cls;
+       my @conflict;
        if ($fmt eq 'maildir') {
                require PublicInbox::MdirReader;
                $self->{base_type} = 'maildir';
@@ -400,6 +412,14 @@ sub new {
                require PublicInbox::LeiViewText;
                $lei->{lvt} = PublicInbox::LeiViewText->new($lei);
                $self->{base_type} = 'text';
+               @conflict = qw(mua save);
+       } elsif ($fmt eq 'v2') {
+               die "--dedupe=oid and v2 are incompatible\n" if
+                       ($lei->{opt}->{dedupe}//'') eq 'oid';
+               $self->{base_type} = 'v2';
+               $lei->{opt}->{save} = \1;
+               $dst = $lei->{ovv}->{dst} = $lei->abs_path($dst);
+               @conflict = qw(mua sort);
        } else {
                die "bad mail --format=$fmt\n";
        }
@@ -407,12 +427,8 @@ sub new {
                (-d $dst || (-e _ && !-w _)) and die
                        "$dst exists and is not a writable file\n";
        }
-       if ($self->{base_type} eq 'text') {
-               my @err = map {
-                       defined($lei->{opt}->{$_}) ? "--$_" : ();
-               } (qw(mua save));
-               die "@err incompatible with $fmt\n" if @err;
-       }
+       my @err = map { defined($lei->{opt}->{$_}) ? "--$_" : () } @conflict;
+       die "@err incompatible with $fmt\n" if @err;
        $self->{dst} = $dst;
        $lei->{dedupe} = $lei->{lss} // do {
                my $dd_cls = 'PublicInbox::'.
@@ -599,9 +615,43 @@ sub _do_augment_mbox {
        $dedupe->pause_dedupe if $dedupe;
 }
 
+sub _pre_augment_v2 {
+       my ($self, $lei) = @_;
+       my $dir = $self->{dst};
+       require PublicInbox::InboxWritable;
+       my ($ibx, @creat);
+       if (-d $dir) {
+               my $opt = { -min_inbox_version => 2 };
+               require PublicInbox::Admin;
+               my @ibx = PublicInbox::Admin::resolve_inboxes([ $dir ], $opt);
+               $ibx = $ibx[0] or die "$dir is not a v2 inbox\n";
+       } else {
+               $creat[0] = {};
+               $ibx = PublicInbox::Inbox->new({
+                       name => 'lei-result', # XXX configurable
+                       inboxdir => $dir,
+                       version => 2,
+                       address => [ 'lei@example.com' ],
+               });
+       }
+       PublicInbox::InboxWritable->new($ibx, @creat);
+       $ibx->init_inbox if @creat;
+       my $v2w = $lei->{v2w} = $ibx->importer;
+       $v2w->ipc_lock_init("$dir/ipc.lock");
+       $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei });
+       return if !$lei->{opt}->{shared};
+       my $d = "$lei->{ale}->{git}->{git_dir}/objects";
+       my $al = "$dir/git/0.git/objects/info/alternates";
+       open my $fh, '+>>', $al or die "open($al): $!";
+       seek($fh, 0, SEEK_SET) or die "seek($al): $!";
+       grep(/\A\Q$d\E\n/, <$fh>) and return;
+       print $fh "$d\n" or die "print($al): $!";
+       close $fh or die "close($al): $!";
+}
+
 sub pre_augment { # fast (1 disk seek), runs in same process as post_augment
        my ($self, $lei) = @_;
-       # _pre_augment_maildir, _pre_augment_mbox
+       # _pre_augment_maildir, _pre_augment_mbox, _pre_augment_v2
        my $m = $self->can("_pre_augment_$self->{base_type}") or return;
        $m->($self, $lei);
 }
@@ -628,7 +678,7 @@ sub do_post_auth {
        my ($self) = @_;
        my $lei = $self->{lei};
        # lei_xsearch can start as soon as all l2m workers get here
-       pkt_do($lei->{pkt_op_p}, 'incr_start_query') or
+       $lei->{pkt_op_p}->pkt_do('incr_start_query') or
                die "incr_start_query: $!";
        my $aug;
        if (lock_free($self)) { # all workers do_augment
@@ -647,7 +697,7 @@ sub do_post_auth {
                local $0 = 'do_augment';
                eval { do_augment($self, $lei) };
                $lei->fail($@) if $@;
-               pkt_do($lei->{pkt_op_p}, $aug) == 1 or
+               $lei->{pkt_op_p}->pkt_do($aug) == 1 or
                                die "do_post_augment trigger: $!";
        }
        # done augmenting, connect the compressor pipe for each worker
@@ -702,13 +752,12 @@ sub write_mail { # via ->wq_io_do
 
 sub wq_atexit_child {
        my ($self) = @_;
-       delete $self->{wcb};
        my $lei = $self->{lei};
+       delete $self->{wcb};
        $lei->{ale}->git->async_wait_all;
        my $nr = delete($lei->{-nr_write}) or return;
        return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p};
-       require PublicInbox::PktOp;
-       PublicInbox::PktOp::pkt_do($lei->{pkt_op_p}, 'l2m_progress', $nr);
+       $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr);
 }
 
 # called in top-level lei-daemon when LeiAuth is done