]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: simplify PktOp callers
authorEric Wong <e@80x24.org>
Sun, 28 Mar 2021 09:01:13 +0000 (09:01 +0000)
committerEric Wong <e@80x24.org>
Sun, 28 Mar 2021 23:01:36 +0000 (23:01 +0000)
Provide a consistent ->op_wait_event method instead of
forcing callers to loop (or not) at each callsite.
This also avoid a leak possibility by avoiding circular
references.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiBlob.pm
lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiMark.pm
lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/LeiP2q.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/PktOp.pm

index 478912cd7fd2948a36995752128c7990f64f19a9..9cacb1426f38eef7ad4eb3eccc6d1fff8a7cacfd 100644 (file)
@@ -494,11 +494,11 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
 }
 
 sub pkt_op_pair {
-       my ($self, $ops) = @_;
+       my ($self) = @_;
        require PublicInbox::OnDestroy;
        require PublicInbox::PktOp;
        my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
-       @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+       @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair;
        $end;
 }
 
@@ -512,14 +512,13 @@ sub workers_start {
                ($ops ? %$ops : ()),
        };
        $ops->{''} //= [ \&dclose, $lei ];
-       my $end = $lei->pkt_op_pair($ops);
+       my $end = $lei->pkt_op_pair;
        $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
        delete $lei->{pkt_op_p};
-       my $op = delete $lei->{pkt_op_c};
+       my $op_c = delete $lei->{pkt_op_c};
        @$end = ();
        $lei->event_step_init;
-       # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
-       $lei->{oneshot} ? $op : undef;
+       ($op_c, $ops);
 }
 
 sub _help {
index 2facbad3ada42a7206e9719589361e2745fc7e32..97747220a684cffdb520442669397d2ae64206ab 100644 (file)
@@ -103,12 +103,12 @@ sub lei_blob {
        my $lxs = $lei->lxs_prepare or return;
        require PublicInbox::SolverGit;
        my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
-       my $op = $lei->workers_start($self, 'lei_solve', 1,
+       my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1,
                { '' => [ \&sol_done, $lei ] });
        $lei->{sol} = $self;
        $self->wq_io_do('do_solve_blob', []);
        $self->wq_close(1);
-       while ($op && $op->{sock}) { $op->event_step }
+       $op_c->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
index 083ecc33bc5bf9fd488f8690a727f4532d7c4982..5d0adb14ecc5ec467c35de3c3a79d55bcdecf05b 100644 (file)
@@ -53,11 +53,11 @@ sub lei_convert { # the main "lei convert" method
        my $devfd = $lei->path_to_fd($ovv->{dst}) // return;
        $lei->{opt}->{augment} = 1 if $devfd < 0;
        $self->prepare_inputs($lei, \@inputs) or return;
-       my $op = $lei->workers_start($self, 'lei_convert', 1);
+       my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
        $lei->{cnv} = $self;
        $self->wq_io_do('do_convert', []);
        $self->wq_close(1);
-       while ($op && $op->{sock}) { $op->event_step }
+       $op_c->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
index 7c5b7d0905eebfbb3354101f5f262df422f1a039..803b5cdad89879d563f0e2b5d625536e67c3fbdd 100644 (file)
@@ -76,11 +76,11 @@ sub lei_import { # the main "lei import" method
        my $ops = { '' => [ \&import_done, $lei ] };
        $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
        $self->{-wq_nr_workers} = $j // 1; # locked
-       my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
+       my ($op_c, undef) = $lei->workers_start($self, 'lei_import', $j, $ops);
        $lei->{imp} = $self;
        $self->wq_io_do('input_stdin', []) if $self->{0};
        net_merge_complete($self) unless $lei->{auth};
-       while ($op && $op->{sock}) { $op->event_step }
+       $op_c->op_wait_event($ops);
 }
 
 no warnings 'once';
index 34846b842a5aff01164dd0662b8a210dd98310e4..6e611318b3ff8510cda42329d977db28917ce6da 100644 (file)
@@ -116,11 +116,11 @@ sub lei_mark { # the "lei mark" method
        my $ops = { '' => [ \&mark_done, $lei ] };
        $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
        $self->{vmd_mod} = $vmd_mod;
-       my $op = $lei->workers_start($self, 'lei_mark', 1, $ops);
+       my ($op_c, undef) = $lei->workers_start($self, 'lei_mark', 1, $ops);
        $lei->{mark} = $self;
        $self->wq_io_do('input_stdin', []) if $self->{0};
        net_merge_complete($self) unless $lei->{auth};
-       while ($op && $op->{sock}) { $op->event_step }
+       $op_c->op_wait_event($ops);
 }
 
 sub note_missing {
index c83386c61ff4717b9f90b63607818d859639a8e2..89574d28f5b3f9f12ecc859c831290917183bb57 100644 (file)
@@ -282,13 +282,13 @@ sub start {
        require PublicInbox::Inbox;
        require PublicInbox::Admin;
        require PublicInbox::InboxWritable;
-       my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+       my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, {
                '' => [ \&mirror_done, $lei ]
        });
        $lei->{mrr} = $self;
        $self->wq_io_do('do_mirror', []);
        $self->wq_close(1);
-       while ($op && $op->{sock}) { $op->event_step }
+       $op->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
index 25f63a10d623c62dbb1abbacbabc786447d9cca1..a8a3dd2ccfa7d0bcae67ba7d9b3ec4a66b122ab0 100644 (file)
@@ -185,11 +185,11 @@ sub lei_p2q { # the "lei patch-to-query" entry point
        } else {
                $self->{input} = $input;
        }
-       my $op = $lei->workers_start($self, 'lei_p2q', 1);
+       my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1);
        $lei->{p2q} = $self;
        $self->wq_io_do('do_p2q', []);
        $self->wq_close(1);
-       while ($op && $op->{sock}) { $op->event_step }
+       $op->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
index b41daffebbdfacf3114426a0a6743dd0c0c44b1d..1a194f1c9dd9d45285e21bba796a0feb33095cc2 100644 (file)
@@ -427,7 +427,7 @@ sub do_query {
                'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
        };
        $lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
-       my $end = $lei->pkt_op_pair($ops);
+       my $end = $lei->pkt_op_pair;
        $lei->{1}->autoflush(1);
        $lei->start_pager if delete $lei->{need_pager};
        $lei->{ovv}->ovv_begin($lei);
@@ -445,7 +445,7 @@ sub do_query {
        }
        $self->wq_workers_start('lei_xsearch', undef,
                                $lei->oldset, { lei => $lei });
-       my $op = delete $lei->{pkt_op_c};
+       my $op_c = delete $lei->{pkt_op_c};
        delete $lei->{pkt_op_p};
        @$end = ();
        $self->{threads} = $lei->{opt}->{threads};
@@ -455,9 +455,7 @@ sub do_query {
                start_query($self);
        }
        $lei->event_step_init; # wait for shutdowns
-       if ($lei->{oneshot}) {
-               while ($op->{sock}) { $op->event_step }
-       }
+       $op_c->op_wait_event($ops);
 }
 
 sub add_uri {
index 5d8e78ea78635211885271cb650f1ed48d76696e..c32217354a024a170f719ce973b3ff885a12df01 100644 (file)
@@ -16,21 +16,23 @@ use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
 our @EXPORT_OK = qw(pkt_do);
 
 sub new {
-       my ($cls, $r, $ops) = @_;
-       my $self = bless { sock => $r, ops => $ops }, $cls;
+       my ($cls, $r) = @_;
+       my $self = bless { sock => $r }, $cls;
        if ($PublicInbox::DS::in_loop) { # iff using DS->EventLoop
                $r->blocking(0);
                $self->SUPER::new($r, EPOLLIN|EPOLLET);
+       } else {
+               $self->{blocking} = 1;
        }
        $self;
 }
 
 # returns a blessed object as the consumer, and a GLOB/IO for the producer
 sub pair {
-       my ($cls, $ops) = @_;
+       my ($cls) = @_;
        my ($c, $p);
        socketpair($c, $p, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
-       (new($cls, $c, $ops), $p);
+       (new($cls, $c), $p);
 }
 
 sub pkt_do { # for the producer to trigger event_step in consumer
@@ -41,7 +43,7 @@ sub pkt_do { # for the producer to trigger event_step in consumer
 sub close {
        my ($self) = @_;
        my $c = $self->{sock} or return;
-       $c->blocking ? delete($self->{sock}) : $self->SUPER::close;
+       $self->{blocking} ? delete($self->{sock}) : $self->SUPER::close;
 }
 
 sub event_step {
@@ -73,4 +75,12 @@ sub event_step {
        }
 }
 
+# call this when we're ready to wait on events,
+# returns immediately if non-blocking
+sub op_wait_event {
+       my ($self, $ops) = @_;
+       $self->{ops} = $ops;
+       while ($self->{blocking} && $self->{sock}) { event_step($self) }
+}
+
 1;