]> Sergey Matveev's repositories - bfs.git/blob - src/ioq.c
Formatting fixes
[bfs.git] / src / ioq.c
1 // Copyright © Tavian Barnes <tavianator@tavianator.com>
2 // SPDX-License-Identifier: 0BSD
3
4 #include "ioq.h"
5 #include "alloc.h"
6 #include "atomic.h"
7 #include "bfstd.h"
8 #include "bit.h"
9 #include "config.h"
10 #include "diag.h"
11 #include "dir.h"
12 #include "sanity.h"
13 #include "thread.h"
14 #include <assert.h>
15 #include <errno.h>
16 #include <pthread.h>
17 #include <stdlib.h>
18
19 /**
20  * A monitor for an I/O queue slot.
21  */
22 struct ioq_monitor {
23         cache_align pthread_mutex_t mutex;
24         pthread_cond_t cond;
25 };
26
27 /** Initialize an ioq_monitor. */
28 static int ioq_monitor_init(struct ioq_monitor *monitor) {
29         if (mutex_init(&monitor->mutex, NULL) != 0) {
30                 return -1;
31         }
32
33         if (cond_init(&monitor->cond, NULL) != 0) {
34                 mutex_destroy(&monitor->mutex);
35                 return -1;
36         }
37
38         return 0;
39 }
40
41 /** Destroy an ioq_monitor. */
42 static void ioq_monitor_destroy(struct ioq_monitor *monitor) {
43         cond_destroy(&monitor->cond);
44         mutex_destroy(&monitor->mutex);
45 }
46
47 /** A single entry in a command queue. */
48 typedef atomic uintptr_t ioq_slot;
49
50 /** Slot flag bit to indicate waiters. */
51 #define IOQ_BLOCKED ((uintptr_t)1)
52 bfs_static_assert(alignof(struct ioq_ent) > 1);
53
54 /** Check if a slot has waiters. */
55 static bool ioq_slot_blocked(uintptr_t value) {
56         return value & IOQ_BLOCKED;
57 }
58
59 /** Extract the pointer from a slot. */
60 static struct ioq_ent *ioq_slot_ptr(uintptr_t value) {
61         return (struct ioq_ent *)(value & ~IOQ_BLOCKED);
62 }
63
64 /** Check if a slot is empty. */
65 static bool ioq_slot_empty(uintptr_t value) {
66         return !ioq_slot_ptr(value);
67 }
68
69 /**
70  * An MPMC queue of I/O commands.
71  */
72 struct ioqq {
73         /** Circular buffer index mask. */
74         size_t slot_mask;
75
76         /** Monitor index mask. */
77         size_t monitor_mask;
78         /** Array of monitors used by the slots. */
79         struct ioq_monitor *monitors;
80
81         /** Index of next writer. */
82         cache_align atomic size_t head;
83         /** Index of next reader. */
84         cache_align atomic size_t tail;
85
86         /** The circular buffer itself. */
87         cache_align ioq_slot slots[];
88 };
89
90 // If we assign slots sequentially, threads will likely be operating on
91 // consecutive slots.  If these slots are in the same cache line, that will
92 // result in false sharing.  We can mitigate this by assigning slots with a
93 // stride larger than a cache line e.g. 0, 9, 18, ..., 1, 10, 19, ...
94 // As long as the stride is relatively prime to circular buffer length, we'll
95 // still use every available slot.  Since the length is a power of two, that
96 // means the stride must be odd.
97
98 #define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(ioq_slot)) | 1)
99 bfs_static_assert(IOQ_STRIDE % 2 == 1);
100
101 /** Destroy an I/O command queue. */
102 static void ioqq_destroy(struct ioqq *ioqq) {
103         for (size_t i = 0; i < ioqq->monitor_mask + 1; ++i) {
104                 ioq_monitor_destroy(&ioqq->monitors[i]);
105         }
106         free(ioqq->monitors);
107         free(ioqq);
108 }
109
110 /** Create an I/O command queue. */
111 static struct ioqq *ioqq_create(size_t size) {
112         // Circular buffer size must be a power of two
113         size = bit_ceil(size);
114
115         struct ioqq *ioqq = ALLOC_FLEX(struct ioqq, slots, size);
116         if (!ioqq) {
117                 return NULL;
118         }
119
120         ioqq->slot_mask = size - 1;
121         ioqq->monitor_mask = -1;
122
123         // Use a pool of monitors
124         size_t nmonitors = size < 64 ? size : 64;
125         ioqq->monitors = ALLOC_ARRAY(struct ioq_monitor, nmonitors);
126         if (!ioqq->monitors) {
127                 ioqq_destroy(ioqq);
128                 return NULL;
129         }
130
131         for (size_t i = 0; i < nmonitors; ++i) {
132                 if (ioq_monitor_init(&ioqq->monitors[i]) != 0) {
133                         ioqq_destroy(ioqq);
134                         return NULL;
135                 }
136                 ++ioqq->monitor_mask;
137         }
138
139         atomic_init(&ioqq->head, 0);
140         atomic_init(&ioqq->tail, 0);
141
142         for (size_t i = 0; i < size; ++i) {
143                 atomic_init(&ioqq->slots[i], 0);
144         }
145
146         return ioqq;
147 }
148
149 /** Get the monitor associated with a slot. */
150 static struct ioq_monitor *ioq_slot_monitor(struct ioqq *ioqq, ioq_slot *slot) {
151         size_t i = slot - ioqq->slots;
152         return &ioqq->monitors[i & ioqq->monitor_mask];
153 }
154
155 /** Atomically wait for a slot to change. */
156 static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) {
157         struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot);
158         mutex_lock(&monitor->mutex);
159
160         uintptr_t ret = load(slot, relaxed);
161         if (ret != value) {
162                 goto done;
163         }
164
165         if (!(value & IOQ_BLOCKED)) {
166                 value |= IOQ_BLOCKED;
167                 if (!compare_exchange_strong(slot, &ret, value, relaxed, relaxed)) {
168                         goto done;
169                 }
170         }
171
172         do {
173                 // To avoid missed wakeups, it is important that
174                 // cond_broadcast() is not called right here
175                 cond_wait(&monitor->cond, &monitor->mutex);
176                 ret = load(slot, relaxed);
177         } while (ret == value);
178
179 done:
180         mutex_unlock(&monitor->mutex);
181         return ret;
182 }
183
184 /** Wake up any threads waiting on a slot. */
185 static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
186         struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot);
187
188         // The following implementation would clearly avoid the missed wakeup
189         // issue mentioned above in ioq_slot_wait():
190         //
191         //     mutex_lock(&monitor->mutex);
192         //     cond_broadcast(&monitor->cond);
193         //     mutex_unlock(&monitor->mutex);
194         //
195         // As a minor optimization, we move the broadcast outside of the lock.
196         // This optimization is correct, even though it leads to a seemingly-
197         // useless empty critical section.
198
199         mutex_lock(&monitor->mutex);
200         mutex_unlock(&monitor->mutex);
201         cond_broadcast(&monitor->cond);
202 }
203
204 /** Get the next slot for writing. */
205 static ioq_slot *ioqq_write(struct ioqq *ioqq) {
206         size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
207         return &ioqq->slots[i & ioqq->slot_mask];
208 }
209
210 /** Push an entry into a slot. */
211 static void ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) {
212         uintptr_t addr = (uintptr_t)ent;
213         bfs_assert(!ioq_slot_blocked(addr));
214
215         uintptr_t prev = load(slot, relaxed);
216         do {
217                 while (!ioq_slot_empty(prev)) {
218                         prev = ioq_slot_wait(ioqq, slot, prev);
219                 }
220         } while (!compare_exchange_weak(slot, &prev, addr, release, relaxed));
221
222         if (ioq_slot_blocked(prev)) {
223                 ioq_slot_wake(ioqq, slot);
224         }
225 }
226
227 /** Push an entry onto the queue. */
228 static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
229         ioq_slot *slot = ioqq_write(ioqq);
230         ioq_slot_push(ioqq, slot, ent);
231 }
232
233 /** Get the next slot for reading. */
234 static ioq_slot *ioqq_read(struct ioqq *ioqq) {
235         size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
236         return &ioqq->slots[i & ioqq->slot_mask];
237 }
238
239 /** (Try to) pop an entry from a slot. */
240 static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) {
241         uintptr_t prev = load(slot, relaxed);
242         do {
243                 while (ioq_slot_empty(prev)) {
244                         if (block) {
245                                 prev = ioq_slot_wait(ioqq, slot, prev);
246                         } else {
247                                 return NULL;
248                         }
249                 }
250         } while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed));
251
252         if (ioq_slot_blocked(prev)) {
253                 ioq_slot_wake(ioqq, slot);
254         }
255
256         return ioq_slot_ptr(prev);
257 }
258
259 /** Pop an entry from the queue. */
260 static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) {
261         ioq_slot *slot = ioqq_read(ioqq);
262         return ioq_slot_pop(ioqq, slot, true);
263 }
264
265 /** Pop an entry from the queue if one is available. */
266 static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
267         size_t i = load(&ioqq->tail, relaxed);
268         ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
269
270         struct ioq_ent *ret = ioq_slot_pop(ioqq, slot, false);
271         if (ret) {
272                 size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed);
273                 bfs_assert(j == i, "Detected multiple consumers");
274                 (void)j;
275         }
276
277         return ret;
278 }
279
280 /** Sentinel stop command. */
281 static struct ioq_ent IOQ_STOP;
282
283 struct ioq {
284         /** The depth of the queue. */
285         size_t depth;
286         /** The current size of the queue. */
287         size_t size;
288         /** Cancellation flag. */
289         atomic bool cancel;
290
291         /** ioq_ent arena. */
292         struct arena ents;
293
294         /** Pending I/O requests. */
295         struct ioqq *pending;
296         /** Ready I/O responses. */
297         struct ioqq *ready;
298
299         /** The number of background threads. */
300         size_t nthreads;
301         /** The background threads themselves. */
302         pthread_t threads[];
303 };
304
305 /** Background thread entry point. */
306 static void *ioq_work(void *ptr) {
307         struct ioq *ioq = ptr;
308
309         while (true) {
310                 struct ioq_ent *ent = ioqq_pop(ioq->pending);
311                 if (ent == &IOQ_STOP) {
312                         break;
313                 }
314
315                 bool cancel = load(&ioq->cancel, relaxed);
316
317                 ent->ret = -1;
318
319                 switch (ent->op) {
320                 case IOQ_CLOSE:
321                         // Always close(), even if we're cancelled, just like a real EINTR
322                         ent->ret = xclose(ent->close.fd);
323                         break;
324
325                 case IOQ_OPENDIR:
326                         if (!cancel) {
327                                 struct ioq_opendir *args = &ent->opendir;
328                                 ent->ret = bfs_opendir(args->dir, args->dfd, args->path);
329                                 if (ent->ret == 0) {
330                                         bfs_polldir(args->dir);
331                                 }
332                         }
333                         break;
334
335                 case IOQ_CLOSEDIR:
336                         ent->ret = bfs_closedir(ent->closedir.dir);
337                         break;
338
339                 default:
340                         bfs_bug("Unknown ioq_op %d", (int)ent->op);
341                         errno = ENOSYS;
342                         break;
343                 }
344
345                 if (cancel) {
346                         ent->error = EINTR;
347                 } else if (ent->ret < 0) {
348                         ent->error = errno;
349                 } else {
350                         ent->error = 0;
351                 }
352
353                 ioqq_push(ioq->ready, ent);
354         }
355
356         return NULL;
357 }
358
359 struct ioq *ioq_create(size_t depth, size_t nthreads) {
360         struct ioq *ioq = ZALLOC_FLEX(struct ioq, threads, nthreads);
361         if (!ioq) {
362                 goto fail;
363         }
364
365         ioq->depth = depth;
366         ARENA_INIT(&ioq->ents, struct ioq_ent);
367
368         ioq->pending = ioqq_create(depth);
369         if (!ioq->pending) {
370                 goto fail;
371         }
372
373         ioq->ready = ioqq_create(depth);
374         if (!ioq->ready) {
375                 goto fail;
376         }
377
378         for (size_t i = 0; i < nthreads; ++i) {
379                 if (thread_create(&ioq->threads[i], NULL, ioq_work, ioq) != 0) {
380                         goto fail;
381                 }
382                 ++ioq->nthreads;
383         }
384
385         return ioq;
386
387         int err;
388 fail:
389         err = errno;
390         ioq_destroy(ioq);
391         errno = err;
392         return NULL;
393 }
394
395 size_t ioq_capacity(const struct ioq *ioq) {
396         return ioq->depth - ioq->size;
397 }
398
399 static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) {
400         if (load(&ioq->cancel, relaxed)) {
401                 errno = EINTR;
402                 return NULL;
403         }
404
405         if (ioq->size >= ioq->depth) {
406                 errno = EAGAIN;
407                 return NULL;
408         }
409
410         struct ioq_ent *ent = arena_alloc(&ioq->ents);
411         if (!ent) {
412                 return NULL;
413         }
414
415         ent->op = op;
416         ent->ptr = ptr;
417         ++ioq->size;
418         return ent;
419 }
420
421 int ioq_close(struct ioq *ioq, int fd, void *ptr) {
422         struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr);
423         if (!ent) {
424                 return -1;
425         }
426
427         ent->close.fd = fd;
428
429         ioqq_push(ioq->pending, ent);
430         return 0;
431 }
432
433 int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) {
434         struct ioq_ent *ent = ioq_request(ioq, IOQ_OPENDIR, ptr);
435         if (!ent) {
436                 return -1;
437         }
438
439         struct ioq_opendir *args = &ent->opendir;
440         args->dir = dir;
441         args->dfd = dfd;
442         args->path = path;
443
444         ioqq_push(ioq->pending, ent);
445         return 0;
446 }
447
448 int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {
449         struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSEDIR, ptr);
450         if (!ent) {
451                 return -1;
452         }
453
454         ent->closedir.dir = dir;
455
456         ioqq_push(ioq->pending, ent);
457         return 0;
458 }
459
460 struct ioq_ent *ioq_pop(struct ioq *ioq) {
461         if (ioq->size == 0) {
462                 return NULL;
463         }
464
465         return ioqq_pop(ioq->ready);
466 }
467
468 struct ioq_ent *ioq_trypop(struct ioq *ioq) {
469         if (ioq->size == 0) {
470                 return NULL;
471         }
472
473         return ioqq_trypop(ioq->ready);
474 }
475
476 void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
477         bfs_assert(ioq->size > 0);
478         --ioq->size;
479
480         arena_free(&ioq->ents, ent);
481 }
482
483 void ioq_cancel(struct ioq *ioq) {
484         if (!exchange(&ioq->cancel, true, relaxed)) {
485                 for (size_t i = 0; i < ioq->nthreads; ++i) {
486                         ioqq_push(ioq->pending, &IOQ_STOP);
487                 }
488         }
489 }
490
491 void ioq_destroy(struct ioq *ioq) {
492         if (!ioq) {
493                 return;
494         }
495
496         ioq_cancel(ioq);
497
498         for (size_t i = 0; i < ioq->nthreads; ++i) {
499                 thread_join(ioq->threads[i], NULL);
500         }
501
502         ioqq_destroy(ioq->ready);
503         ioqq_destroy(ioq->pending);
504
505         arena_destroy(&ioq->ents);
506
507         free(ioq);
508 }