]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/DS.pm
9e24ed78d10f2ab9891340b8df5fb78d4827c95a
[public-inbox.git] / lib / PublicInbox / DS.pm
1 # This library is free software; you can redistribute it and/or modify
2 # it under the same terms as Perl itself.
3 #
4 # This license differs from the rest of public-inbox
5 #
6 # This is a fork of the (for now) unmaintained Danga::Socket 1.61.
7 # Unused features will be removed, and updates will be made to take
8 # advantage of newer kernels.
9 #
10 # API changes to diverge from Danga::Socket will happen to better
11 # accomodate new features and improve scalability.  Do not expect
12 # this to be a stable API like Danga::Socket.
13 # Bugs encountered (and likely fixed) are reported to
14 # bug-Danga-Socket@rt.cpan.org and visible at:
15 # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket
16 package PublicInbox::DS;
17 use strict;
18 use bytes;
19 use POSIX ();
20 use Time::HiRes ();
21 use IO::Handle qw();
22 use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
23
24 use warnings;
25
26 use PublicInbox::Syscall qw(:epoll);
27
28 use fields ('sock',              # underlying socket
29             'wbuf',              # arrayref of scalars, scalarrefs, or coderefs to write
30             'wbuf_off',  # offset into first element of wbuf to start writing at
31             'event_watch',       # bitmask of events the client is interested in (POLLIN,OUT,etc.)
32             );
33
34 use Errno  qw(EAGAIN EINVAL);
35 use Carp   qw(croak confess);
36
37 use constant POLLIN        => 1;
38 use constant POLLOUT       => 4;
39 use constant POLLERR       => 8;
40 use constant POLLHUP       => 16;
41 use constant POLLNVAL      => 32;
42
43 our $HAVE_KQUEUE = eval { require IO::KQueue; 1 };
44
45 our (
46      $HaveEpoll,                 # Flag -- is epoll available?  initially undefined.
47      $HaveKQueue,
48      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
49      $Epoll,                     # Global epoll fd (for epoll mode only)
50      $KQueue,                    # Global kqueue fd ref (for kqueue mode only)
51      $_io,                       # IO::Handle for Epoll
52      @ToClose,                   # sockets to close when event loop is done
53
54      $PostLoopCallback,          # subref to call at the end of each loop, if defined (global)
55
56      $LoopTimeout,               # timeout of event loop in milliseconds
57      $DoneInit,                  # if we've done the one-time module init yet
58      @Timers,                    # timers
59      );
60
61 # this may be set to zero with old kernels
62 our $EPOLLEXCLUSIVE = EPOLLEXCLUSIVE;
63 Reset();
64
65 #####################################################################
66 ### C L A S S   M E T H O D S
67 #####################################################################
68
69 =head2 C<< CLASS->Reset() >>
70
71 Reset all state
72
73 =cut
74 sub Reset {
75     %DescriptorMap = ();
76     @ToClose = ();
77     $LoopTimeout = -1;  # no timeout by default
78     @Timers = ();
79
80     $PostLoopCallback = undef;
81     $DoneInit = 0;
82
83     # NOTE kqueue is close-on-fork, and we don't account for it, yet
84     # OTOH, we (public-inbox) don't need this sub outside of tests...
85     POSIX::close($$KQueue) if !$_io && $KQueue && $$KQueue >= 0;
86     $KQueue = undef;
87
88     $_io = undef; # close $Epoll
89     $Epoll = undef;
90
91     *EventLoop = *FirstTimeEventLoop;
92 }
93
94 =head2 C<< CLASS->SetLoopTimeout( $timeout ) >>
95
96 Set the loop timeout for the event loop to some value in milliseconds.
97
98 A timeout of 0 (zero) means poll forever. A timeout of -1 means poll and return
99 immediately.
100
101 =cut
102 sub SetLoopTimeout {
103     return $LoopTimeout = $_[1] + 0;
104 }
105
106 =head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
107
108 Add a timer to occur $seconds from now. $seconds may be fractional, but timers
109 are not guaranteed to fire at the exact time you ask for.
110
111 Returns a timer object which you can call C<< $timer->cancel >> on if you need to.
112
113 =cut
114 sub AddTimer {
115     my $class = shift;
116     my ($secs, $coderef) = @_;
117
118     my $fire_time = Time::HiRes::time() + $secs;
119
120     my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
121
122     if (!@Timers || $fire_time >= $Timers[-1][0]) {
123         push @Timers, $timer;
124         return $timer;
125     }
126
127     # Now, where do we insert?  (NOTE: this appears slow, algorithm-wise,
128     # but it was compared against calendar queues, heaps, naive push/sort,
129     # and a bunch of other versions, and found to be fastest with a large
130     # variety of datasets.)
131     for (my $i = 0; $i < @Timers; $i++) {
132         if ($Timers[$i][0] > $fire_time) {
133             splice(@Timers, $i, 0, $timer);
134             return $timer;
135         }
136     }
137
138     die "Shouldn't get here.";
139 }
140
141 # keeping this around in case we support other FD types for now,
142 # epoll_create1(EPOLL_CLOEXEC) requires Linux 2.6.27+...
143 sub set_cloexec ($) {
144     my ($fd) = @_;
145
146     $_io = IO::Handle->new_from_fd($fd, 'r+') or return;
147     defined(my $fl = fcntl($_io, F_GETFD, 0)) or return;
148     fcntl($_io, F_SETFD, $fl | FD_CLOEXEC);
149 }
150
151 sub _InitPoller
152 {
153     return if $DoneInit;
154     $DoneInit = 1;
155
156     if ($HAVE_KQUEUE) {
157         $KQueue = IO::KQueue->new();
158         $HaveKQueue = defined $KQueue;
159         if ($HaveKQueue) {
160             *EventLoop = *KQueueEventLoop;
161         }
162     }
163     elsif (PublicInbox::Syscall::epoll_defined()) {
164         $Epoll = eval { epoll_create(1024); };
165         $HaveEpoll = defined $Epoll && $Epoll >= 0;
166         if ($HaveEpoll) {
167             set_cloexec($Epoll);
168             *EventLoop = *EpollEventLoop;
169         }
170     }
171
172     if (!$HaveEpoll && !$HaveKQueue) {
173         require IO::Poll;
174         *EventLoop = *PollEventLoop;
175     }
176 }
177
178 =head2 C<< CLASS->EventLoop() >>
179
180 Start processing IO events. In most daemon programs this never exits. See
181 C<PostLoopCallback> below for how to exit the loop.
182
183 =cut
184 sub FirstTimeEventLoop {
185     my $class = shift;
186
187     _InitPoller();
188
189     if ($HaveEpoll) {
190         EpollEventLoop($class);
191     } elsif ($HaveKQueue) {
192         KQueueEventLoop($class);
193     } else {
194         PollEventLoop($class);
195     }
196 }
197
198 # runs timers and returns milliseconds for next one, or next event loop
199 sub RunTimers {
200     return $LoopTimeout unless @Timers;
201
202     my $now = Time::HiRes::time();
203
204     # Run expired timers
205     while (@Timers && $Timers[0][0] <= $now) {
206         my $to_run = shift(@Timers);
207         $to_run->[1]->($now) if $to_run->[1];
208     }
209
210     return $LoopTimeout unless @Timers;
211
212     # convert time to an even number of milliseconds, adding 1
213     # extra, otherwise floating point fun can occur and we'll
214     # call RunTimers like 20-30 times, each returning a timeout
215     # of 0.0000212 seconds
216     my $timeout = int(($Timers[0][0] - $now) * 1000) + 1;
217
218     # -1 is an infinite timeout, so prefer a real timeout
219     return $timeout     if $LoopTimeout == -1;
220
221     # otherwise pick the lower of our regular timeout and time until
222     # the next timer
223     return $LoopTimeout if $LoopTimeout < $timeout;
224     return $timeout;
225 }
226
227 ### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
228 ### okay.
229 sub EpollEventLoop {
230     my $class = shift;
231
232     while (1) {
233         my @events;
234         my $i;
235         my $timeout = RunTimers();
236
237         # get up to 1000 events
238         my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events);
239         for ($i=0; $i<$evcount; $i++) {
240             # it's possible epoll_wait returned many events, including some at the end
241             # that ones in the front triggered unregister-interest actions.  if we
242             # can't find the %sock entry, it's because we're no longer interested
243             # in that event.
244             $DescriptorMap{$events[$i]->[0]}->event_step;
245         }
246         return unless PostEventLoop();
247     }
248     exit 0;
249 }
250
251 ### The fallback IO::Poll-based event loop. Gets installed as EventLoop if
252 ### IO::Epoll fails to load.
253 sub PollEventLoop {
254     my $class = shift;
255
256     my PublicInbox::DS $pob;
257
258     while (1) {
259         my $timeout = RunTimers();
260
261         # the following sets up @poll as a series of ($poll,$event_mask)
262         # items, then uses IO::Poll::_poll, implemented in XS, which
263         # modifies the array in place with the even elements being
264         # replaced with the event masks that occured.
265         my @poll;
266         while ( my ($fd, $sock) = each %DescriptorMap ) {
267             push @poll, $fd, $sock->{event_watch};
268         }
269
270         # if nothing to poll, either end immediately (if no timeout)
271         # or just keep calling the callback
272         unless (@poll) {
273             select undef, undef, undef, ($timeout / 1000);
274             return unless PostEventLoop();
275             next;
276         }
277
278         my $count = IO::Poll::_poll($timeout, @poll);
279         unless ($count >= 0) {
280             return unless PostEventLoop();
281             next;
282         }
283
284         # Fetch handles with read events
285         while (@poll) {
286             my ($fd, $state) = splice(@poll, 0, 2);
287             $DescriptorMap{$fd}->event_step if $state;
288         }
289
290         return unless PostEventLoop();
291     }
292
293     exit 0;
294 }
295
296 ### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
297 ### okay.
298 sub KQueueEventLoop {
299     my $class = shift;
300
301     while (1) {
302         my $timeout = RunTimers();
303         my @ret = eval { $KQueue->kevent($timeout) };
304         if (my $err = $@) {
305             # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
306             if ($err =~ /Interrupted system call/) {
307                 @ret = ();
308             } else {
309                 die $err;
310             }
311         }
312
313         foreach my $kev (@ret) {
314             $DescriptorMap{$kev->[0]}->event_step;
315         }
316         return unless PostEventLoop();
317     }
318
319     exit(0);
320 }
321
322 =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
323
324 Sets post loop callback function.  Pass a subref and it will be
325 called every time the event loop finishes.
326
327 Return 1 (or any true value) from the sub to make the loop continue, 0 or false
328 and it will exit.
329
330 The callback function will be passed two parameters: \%DescriptorMap
331
332 =cut
333 sub SetPostLoopCallback {
334     my ($class, $ref) = @_;
335
336     # global callback
337     $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
338 }
339
340 # Internal function: run the post-event callback, send read events
341 # for pushed-back data, and close pending connections.  returns 1
342 # if event loop should continue, or 0 to shut it all down.
343 sub PostEventLoop {
344     # now we can close sockets that wanted to close during our event processing.
345     # (we didn't want to close them during the loop, as we didn't want fd numbers
346     #  being reused and confused during the event loop)
347     while (my $sock = shift @ToClose) {
348         my $fd = fileno($sock);
349
350         # close the socket.  (not a PublicInbox::DS close)
351         $sock->close;
352
353         # and now we can finally remove the fd from the map.  see
354         # comment above in ->close.
355         delete $DescriptorMap{$fd};
356     }
357
358
359     # by default we keep running, unless a postloop callback (either per-object
360     # or global) cancels it
361     my $keep_running = 1;
362
363     # now we're at the very end, call callback if defined
364     if (defined $PostLoopCallback) {
365         $keep_running &&= $PostLoopCallback->(\%DescriptorMap);
366     }
367
368     return $keep_running;
369 }
370
371 #####################################################################
372 ### PublicInbox::DS-the-object code
373 #####################################################################
374
375 =head2 OBJECT METHODS
376
377 =head2 C<< CLASS->new( $socket ) >>
378
379 Create a new PublicInbox::DS subclass object for the given I<socket> which will
380 react to events on it during the C<EventLoop>.
381
382 This is normally (always?) called from your subclass via:
383
384   $class->SUPER::new($socket);
385
386 =cut
387 sub new {
388     my ($self, $sock, $exclusive) = @_;
389     $self = fields::new($self) unless ref $self;
390
391     $self->{sock} = $sock;
392     my $fd = fileno($sock);
393
394     Carp::cluck("undef sock and/or fd in PublicInbox::DS->new.  sock=" . ($sock || "") . ", fd=" . ($fd || ""))
395         unless $sock && $fd;
396
397     $self->{wbuf} = [];
398     $self->{wbuf_off} = 0;
399
400     my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
401
402     _InitPoller();
403
404     if ($HaveEpoll) {
405         if ($exclusive) {
406             $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP|$EPOLLEXCLUSIVE;
407         }
408 retry:
409         if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
410             if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) {
411                 $EPOLLEXCLUSIVE = 0; # old kernel
412                 $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP;
413                 goto retry;
414             }
415             die "couldn't add epoll watch for $fd: $!\n";
416         }
417     }
418     elsif ($HaveKQueue) {
419         # Add them to the queue but disabled for now
420         $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
421                         IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
422         $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
423                         IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
424     }
425
426     Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
427         if $DescriptorMap{$fd};
428
429     $DescriptorMap{$fd} = $self;
430     return $self;
431 }
432
433
434 #####################################################################
435 ### I N S T A N C E   M E T H O D S
436 #####################################################################
437
438 =head2 C<< $obj->close >>
439
440 Close the socket.
441
442 =cut
443 sub close {
444     my ($self) = @_;
445     my $sock = delete $self->{sock} or return;
446
447     # we need to flush our write buffer, as there may
448     # be self-referential closures (sub { $client->close })
449     # preventing the object from being destroyed
450     @{$self->{wbuf}} = ();
451
452     # if we're using epoll, we have to remove this from our epoll fd so we stop getting
453     # notifications about it
454     if ($HaveEpoll) {
455         my $fd = fileno($sock);
456         epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and
457             confess("EPOLL_CTL_DEL: $!");
458     }
459
460     # we explicitly don't delete from DescriptorMap here until we
461     # actually close the socket, as we might be in the middle of
462     # processing an epoll_wait/etc that returned hundreds of fds, one
463     # of which is not yet processed and is what we're closing.  if we
464     # keep it in DescriptorMap, then the event harnesses can just
465     # looked at $pob->{sock} == undef and ignore it.  but if it's an
466     # un-accounted for fd, then it (understandably) freak out a bit
467     # and emit warnings, thinking their state got off.
468
469     # defer closing the actual socket until the event loop is done
470     # processing this round of events.  (otherwise we might reuse fds)
471     push @ToClose, $sock;
472
473     return 0;
474 }
475
476 =head2 C<< $obj->write( $data ) >>
477
478 Write the specified data to the underlying handle.  I<data> may be scalar,
479 scalar ref, code ref (to run when there), or undef just to kick-start.
480 Returns 1 if writes all went through, or 0 if there are writes in queue. If
481 it returns 1, caller should stop waiting for 'writable' events)
482
483 =cut
484 sub write {
485     my PublicInbox::DS $self;
486     my $data;
487     ($self, $data) = @_;
488
489     # nobody should be writing to closed sockets, but caller code can
490     # do two writes within an event, have the first fail and
491     # disconnect the other side (whose destructor then closes the
492     # calling object, but it's still in a method), and then the
493     # now-dead object does its second write.  that is this case.  we
494     # just lie and say it worked.  it'll be dead soon and won't be
495     # hurt by this lie.
496     return 1 unless $self->{sock};
497
498     my $bref;
499
500     # just queue data if there's already a wait
501     my $need_queue;
502     my $wbuf = $self->{wbuf};
503
504     if (defined $data) {
505         $bref = ref $data ? $data : \$data;
506         if (scalar @$wbuf) {
507             push @$wbuf, $bref;
508             return 0;
509         }
510
511         # this flag says we're bypassing the queue system, knowing we're the
512         # only outstanding write, and hoping we don't ever need to use it.
513         # if so later, though, we'll need to queue
514         $need_queue = 1;
515     }
516
517   WRITE:
518     while (1) {
519         return 1 unless $bref ||= $wbuf->[0];
520
521         my $len;
522         eval {
523             $len = length($$bref); # this will die if $bref is a code ref, caught below
524         };
525         if ($@) {
526             if (UNIVERSAL::isa($bref, "CODE")) {
527                 unless ($need_queue) {
528                     shift @$wbuf;
529                 }
530                 $bref->();
531
532                 # code refs are just run and never get reenqueued
533                 # (they're one-shot), so turn off the flag indicating the
534                 # outstanding data needs queueing.
535                 $need_queue = 0;
536
537                 undef $bref;
538                 next WRITE;
539             }
540             die "Write error: $@ <$bref>";
541         }
542
543         my $to_write = $len - $self->{wbuf_off};
544         my $written = syswrite($self->{sock}, $$bref, $to_write,
545                                $self->{wbuf_off});
546
547         if (! defined $written) {
548             if ($! == EAGAIN) {
549                 # since connection has stuff to write, it should now be
550                 # interested in pending writes:
551                 if ($need_queue) {
552                     push @$wbuf, $bref;
553                 }
554                 $self->watch_write(1);
555                 return 0;
556             }
557
558             return $self->close;
559         } elsif ($written != $to_write) {
560             if ($need_queue) {
561                 push @$wbuf, $bref;
562             }
563             # since connection has stuff to write, it should now be
564             # interested in pending writes:
565             $self->{wbuf_off} += $written;
566             $self->on_incomplete_write;
567             return 0;
568         } elsif ($written == $to_write) {
569             $self->{wbuf_off} = 0;
570             $self->watch_write(0);
571
572             # this was our only write, so we can return immediately
573             # since we avoided incrementing the buffer size or
574             # putting it in the buffer.  we also know there
575             # can't be anything else to write.
576             return 1 if $need_queue;
577
578             shift @$wbuf;
579             undef $bref;
580             next WRITE;
581         }
582     }
583 }
584
585 sub on_incomplete_write {
586     my PublicInbox::DS $self = shift;
587     $self->watch_write(1);
588 }
589
590 =head2 C<< $obj->watch_read( $boolean ) >>
591
592 Turn 'readable' event notification on or off.
593
594 =cut
595 sub watch_read {
596     my PublicInbox::DS $self = shift;
597     my $sock = $self->{sock} or return;
598
599     my $val = shift;
600     my $event = $self->{event_watch};
601
602     $event &= ~POLLIN if ! $val;
603     $event |=  POLLIN if   $val;
604
605     my $fd = fileno($sock);
606     # If it changed, set it
607     if ($event != $self->{event_watch}) {
608         if ($HaveKQueue) {
609             $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
610                             $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
611         }
612         elsif ($HaveEpoll) {
613             epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
614                 confess("EPOLL_CTL_MOD: $!");
615         }
616         $self->{event_watch} = $event;
617     }
618 }
619
620 =head2 C<< $obj->watch_write( $boolean ) >>
621
622 Turn 'writable' event notification on or off.
623
624 =cut
625 sub watch_write {
626     my PublicInbox::DS $self = shift;
627     my $sock = $self->{sock} or return;
628
629     my $val = shift;
630     my $event = $self->{event_watch};
631
632     $event &= ~POLLOUT if ! $val;
633     $event |=  POLLOUT if   $val;
634     my $fd = fileno($sock);
635
636     # If it changed, set it
637     if ($event != $self->{event_watch}) {
638         if ($HaveKQueue) {
639             $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
640                             $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
641         }
642         elsif ($HaveEpoll) {
643             epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
644                     confess "EPOLL_CTL_MOD: $!";
645         }
646         $self->{event_watch} = $event;
647     }
648 }
649
650 package PublicInbox::DS::Timer;
651 # [$abs_float_firetime, $coderef];
652 sub cancel {
653     $_[0][1] = undef;
654 }
655
656 1;
657
658 =head1 AUTHORS (Danga::Socket)
659
660 Brad Fitzpatrick <brad@danga.com> - author
661
662 Michael Granger <ged@danga.com> - docs, testing
663
664 Mark Smith <junior@danga.com> - contributor, heavy user, testing
665
666 Matt Sergeant <matt@sergeant.org> - kqueue support, docs, timers, other bits