1 // Copyright © Tavian Barnes <tavianator@tavianator.com>
2 // SPDX-License-Identifier: 0BSD
20 * A monitor for an I/O queue slot.
23 cache_align pthread_mutex_t mutex;
27 /** Initialize an ioq_monitor. */
28 static int ioq_monitor_init(struct ioq_monitor *monitor) {
29 if (mutex_init(&monitor->mutex, NULL) != 0) {
33 if (cond_init(&monitor->cond, NULL) != 0) {
34 mutex_destroy(&monitor->mutex);
41 /** Destroy an ioq_monitor. */
42 static void ioq_monitor_destroy(struct ioq_monitor *monitor) {
43 cond_destroy(&monitor->cond);
44 mutex_destroy(&monitor->mutex);
47 /** A single entry in a command queue. */
48 typedef atomic uintptr_t ioq_slot;
50 /** Slot flag bit to indicate waiters. */
51 #define IOQ_BLOCKED ((uintptr_t)1)
52 bfs_static_assert(alignof(struct ioq_ent) > 1);
54 /** Check if a slot has waiters. */
55 static bool ioq_slot_blocked(uintptr_t value) {
56 return value & IOQ_BLOCKED;
59 /** Extract the pointer from a slot. */
60 static struct ioq_ent *ioq_slot_ptr(uintptr_t value) {
61 return (struct ioq_ent *)(value & ~IOQ_BLOCKED);
64 /** Check if a slot is empty. */
65 static bool ioq_slot_empty(uintptr_t value) {
66 return !ioq_slot_ptr(value);
70 * An MPMC queue of I/O commands.
73 /** Circular buffer index mask. */
76 /** Monitor index mask. */
78 /** Array of monitors used by the slots. */
79 struct ioq_monitor *monitors;
81 /** Index of next writer. */
82 cache_align atomic size_t head;
83 /** Index of next reader. */
84 cache_align atomic size_t tail;
86 /** The circular buffer itself. */
87 cache_align ioq_slot slots[];
90 // If we assign slots sequentially, threads will likely be operating on
91 // consecutive slots. If these slots are in the same cache line, that will
92 // result in false sharing. We can mitigate this by assigning slots with a
93 // stride larger than a cache line e.g. 0, 9, 18, ..., 1, 10, 19, ...
94 // As long as the stride is relatively prime to circular buffer length, we'll
95 // still use every available slot. Since the length is a power of two, that
96 // means the stride must be odd.
98 #define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(ioq_slot)) | 1)
99 bfs_static_assert(IOQ_STRIDE % 2 == 1);
101 /** Destroy an I/O command queue. */
102 static void ioqq_destroy(struct ioqq *ioqq) {
103 for (size_t i = 0; i < ioqq->monitor_mask + 1; ++i) {
104 ioq_monitor_destroy(&ioqq->monitors[i]);
106 free(ioqq->monitors);
110 /** Create an I/O command queue. */
111 static struct ioqq *ioqq_create(size_t size) {
112 // Circular buffer size must be a power of two
113 size = bit_ceil(size);
115 struct ioqq *ioqq = ALLOC_FLEX(struct ioqq, slots, size);
120 ioqq->slot_mask = size - 1;
121 ioqq->monitor_mask = -1;
123 // Use a pool of monitors
124 size_t nmonitors = size < 64 ? size : 64;
125 ioqq->monitors = ALLOC_ARRAY(struct ioq_monitor, nmonitors);
126 if (!ioqq->monitors) {
131 for (size_t i = 0; i < nmonitors; ++i) {
132 if (ioq_monitor_init(&ioqq->monitors[i]) != 0) {
136 ++ioqq->monitor_mask;
139 atomic_init(&ioqq->head, 0);
140 atomic_init(&ioqq->tail, 0);
142 for (size_t i = 0; i < size; ++i) {
143 atomic_init(&ioqq->slots[i], 0);
149 /** Get the monitor associated with a slot. */
150 static struct ioq_monitor *ioq_slot_monitor(struct ioqq *ioqq, ioq_slot *slot) {
151 size_t i = slot - ioqq->slots;
152 return &ioqq->monitors[i & ioqq->monitor_mask];
155 /** Atomically wait for a slot to change. */
156 static uintptr_t ioq_slot_wait(struct ioqq *ioqq, ioq_slot *slot, uintptr_t value) {
157 struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot);
158 mutex_lock(&monitor->mutex);
160 uintptr_t ret = load(slot, relaxed);
165 if (!(value & IOQ_BLOCKED)) {
166 value |= IOQ_BLOCKED;
167 if (!compare_exchange_strong(slot, &ret, value, relaxed, relaxed)) {
173 // To avoid missed wakeups, it is important that
174 // cond_broadcast() is not called right here
175 cond_wait(&monitor->cond, &monitor->mutex);
176 ret = load(slot, relaxed);
177 } while (ret == value);
180 mutex_unlock(&monitor->mutex);
184 /** Wake up any threads waiting on a slot. */
185 static void ioq_slot_wake(struct ioqq *ioqq, ioq_slot *slot) {
186 struct ioq_monitor *monitor = ioq_slot_monitor(ioqq, slot);
188 // The following implementation would clearly avoid the missed wakeup
189 // issue mentioned above in ioq_slot_wait():
191 // mutex_lock(&monitor->mutex);
192 // cond_broadcast(&monitor->cond);
193 // mutex_unlock(&monitor->mutex);
195 // As a minor optimization, we move the broadcast outside of the lock.
196 // This optimization is correct, even though it leads to a seemingly-
197 // useless empty critical section.
199 mutex_lock(&monitor->mutex);
200 mutex_unlock(&monitor->mutex);
201 cond_broadcast(&monitor->cond);
204 /** Get the next slot for writing. */
205 static ioq_slot *ioqq_write(struct ioqq *ioqq) {
206 size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed);
207 return &ioqq->slots[i & ioqq->slot_mask];
210 /** Push an entry into a slot. */
211 static void ioq_slot_push(struct ioqq *ioqq, ioq_slot *slot, struct ioq_ent *ent) {
212 uintptr_t addr = (uintptr_t)ent;
213 bfs_assert(!ioq_slot_blocked(addr));
215 uintptr_t prev = load(slot, relaxed);
217 while (!ioq_slot_empty(prev)) {
218 prev = ioq_slot_wait(ioqq, slot, prev);
220 } while (!compare_exchange_weak(slot, &prev, addr, release, relaxed));
222 if (ioq_slot_blocked(prev)) {
223 ioq_slot_wake(ioqq, slot);
227 /** Push an entry onto the queue. */
228 static void ioqq_push(struct ioqq *ioqq, struct ioq_ent *ent) {
229 ioq_slot *slot = ioqq_write(ioqq);
230 ioq_slot_push(ioqq, slot, ent);
233 /** Get the next slot for reading. */
234 static ioq_slot *ioqq_read(struct ioqq *ioqq) {
235 size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed);
236 return &ioqq->slots[i & ioqq->slot_mask];
239 /** (Try to) pop an entry from a slot. */
240 static struct ioq_ent *ioq_slot_pop(struct ioqq *ioqq, ioq_slot *slot, bool block) {
241 uintptr_t prev = load(slot, relaxed);
243 while (ioq_slot_empty(prev)) {
245 prev = ioq_slot_wait(ioqq, slot, prev);
250 } while (!compare_exchange_weak(slot, &prev, 0, acquire, relaxed));
252 if (ioq_slot_blocked(prev)) {
253 ioq_slot_wake(ioqq, slot);
256 return ioq_slot_ptr(prev);
259 /** Pop an entry from the queue. */
260 static struct ioq_ent *ioqq_pop(struct ioqq *ioqq) {
261 ioq_slot *slot = ioqq_read(ioqq);
262 return ioq_slot_pop(ioqq, slot, true);
265 /** Pop an entry from the queue if one is available. */
266 static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
267 size_t i = load(&ioqq->tail, relaxed);
268 ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
270 struct ioq_ent *ret = ioq_slot_pop(ioqq, slot, false);
272 size_t j = exchange(&ioqq->tail, i + IOQ_STRIDE, relaxed);
273 bfs_assert(j == i, "Detected multiple consumers");
280 /** Sentinel stop command. */
281 static struct ioq_ent IOQ_STOP;
284 /** The depth of the queue. */
286 /** The current size of the queue. */
288 /** Cancellation flag. */
291 /** ioq_ent arena. */
294 /** Pending I/O requests. */
295 struct ioqq *pending;
296 /** Ready I/O responses. */
299 /** The number of background threads. */
301 /** The background threads themselves. */
305 /** Background thread entry point. */
306 static void *ioq_work(void *ptr) {
307 struct ioq *ioq = ptr;
310 struct ioq_ent *ent = ioqq_pop(ioq->pending);
311 if (ent == &IOQ_STOP) {
315 bool cancel = load(&ioq->cancel, relaxed);
321 // Always close(), even if we're cancelled, just like a real EINTR
322 ent->ret = xclose(ent->close.fd);
327 struct ioq_opendir *args = &ent->opendir;
328 ent->ret = bfs_opendir(args->dir, args->dfd, args->path);
330 bfs_polldir(args->dir);
336 ent->ret = bfs_closedir(ent->closedir.dir);
340 bfs_bug("Unknown ioq_op %d", (int)ent->op);
347 } else if (ent->ret < 0) {
353 ioqq_push(ioq->ready, ent);
359 struct ioq *ioq_create(size_t depth, size_t nthreads) {
360 struct ioq *ioq = ZALLOC_FLEX(struct ioq, threads, nthreads);
366 ARENA_INIT(&ioq->ents, struct ioq_ent);
368 ioq->pending = ioqq_create(depth);
373 ioq->ready = ioqq_create(depth);
378 for (size_t i = 0; i < nthreads; ++i) {
379 if (thread_create(&ioq->threads[i], NULL, ioq_work, ioq) != 0) {
395 size_t ioq_capacity(const struct ioq *ioq) {
396 return ioq->depth - ioq->size;
399 static struct ioq_ent *ioq_request(struct ioq *ioq, enum ioq_op op, void *ptr) {
400 if (load(&ioq->cancel, relaxed)) {
405 if (ioq->size >= ioq->depth) {
410 struct ioq_ent *ent = arena_alloc(&ioq->ents);
421 int ioq_close(struct ioq *ioq, int fd, void *ptr) {
422 struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSE, ptr);
429 ioqq_push(ioq->pending, ent);
433 int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, void *ptr) {
434 struct ioq_ent *ent = ioq_request(ioq, IOQ_OPENDIR, ptr);
439 struct ioq_opendir *args = &ent->opendir;
444 ioqq_push(ioq->pending, ent);
448 int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {
449 struct ioq_ent *ent = ioq_request(ioq, IOQ_CLOSEDIR, ptr);
454 ent->closedir.dir = dir;
456 ioqq_push(ioq->pending, ent);
460 struct ioq_ent *ioq_pop(struct ioq *ioq) {
461 if (ioq->size == 0) {
465 return ioqq_pop(ioq->ready);
468 struct ioq_ent *ioq_trypop(struct ioq *ioq) {
469 if (ioq->size == 0) {
473 return ioqq_trypop(ioq->ready);
476 void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
477 bfs_assert(ioq->size > 0);
480 arena_free(&ioq->ents, ent);
483 void ioq_cancel(struct ioq *ioq) {
484 if (!exchange(&ioq->cancel, true, relaxed)) {
485 for (size_t i = 0; i < ioq->nthreads; ++i) {
486 ioqq_push(ioq->pending, &IOQ_STOP);
491 void ioq_destroy(struct ioq *ioq) {
498 for (size_t i = 0; i < ioq->nthreads; ++i) {
499 thread_join(ioq->threads[i], NULL);
502 ioqq_destroy(ioq->ready);
503 ioqq_destroy(ioq->pending);
505 arena_destroy(&ioq->ents);