]> Sergey Matveev's repositories - bfs.git/commitdiff
ioq: Use io_uring
authorTavian Barnes <tavianator@tavianator.com>
Tue, 11 Jul 2023 18:04:40 +0000 (14:04 -0400)
committerTavian Barnes <tavianator@tavianator.com>
Mon, 2 Oct 2023 16:19:06 +0000 (12:19 -0400)
Closes #65.

src/bftw.c
src/ioq.c

index 5e5f4a53c49ed8bfcff75442caf3790f1969c60b..902a3fa770744caccb4c18142c2b52e983e069ba 100644 (file)
@@ -470,21 +470,34 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg
 
        state->error = 0;
 
-       if (args->nopenfd < 1) {
+       if (args->nopenfd < 2) {
                errno = EMFILE;
                return -1;
        }
-       bftw_cache_init(&state->cache, args->nopenfd);
 
-       state->nthreads = args->nthreads;
-       if (state->nthreads > 0) {
-               state->ioq = ioq_create(4096, state->nthreads);
+       size_t nopenfd = args->nopenfd;
+       size_t qdepth = 4096;
+       size_t nthreads = args->nthreads;
+
+#if BFS_USE_LIBURING
+       // io_uring uses one fd per ring, ioq uses one ring per thread
+       if (nthreads >= nopenfd - 1) {
+               nthreads = nopenfd - 2;
+       }
+       nopenfd -= nthreads;
+#endif
+
+       bftw_cache_init(&state->cache, nopenfd);
+
+       if (nthreads > 0) {
+               state->ioq = ioq_create(qdepth, nthreads);
                if (!state->ioq) {
                        return -1;
                }
        } else {
                state->ioq = NULL;
        }
+       state->nthreads = nthreads;
 
        SLIST_INIT(&state->to_open);
        SLIST_INIT(&state->to_read);
index d3ba2dee0e23a7f3dc7be466147bbb2a01a778a0..04b9c0d2c38587c2397c5b65a18f59deb8ade321 100644 (file)
--- a/src/ioq.c
+++ b/src/ioq.c
 #include <pthread.h>
 #include <stdlib.h>
 
+#if BFS_USE_LIBURING
+#  include <liburing.h>
+#endif
+
 /**
  * A monitor for an I/O queue slot.
  */
@@ -280,6 +284,21 @@ static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
 /** Sentinel stop command. */
 static struct ioq_ent IOQ_STOP;
 
+/** I/O queue thread-specific data. */
+struct ioq_thread {
+       /** The thread handle. */
+       pthread_t id;
+       /** Pointer back to the I/O queue. */
+       struct ioq *parent;
+
+#if BFS_USE_LIBURING
+       /** io_uring instance. */
+       struct io_uring ring;
+       /** Any error that occurred initializing the ring. */
+       int ring_err;
+#endif
+};
+
 struct ioq {
        /** The depth of the queue. */
        size_t depth;
@@ -299,60 +318,247 @@ struct ioq {
        /** The number of background threads. */
        size_t nthreads;
        /** The background threads themselves. */
-       pthread_t threads[];
+       struct ioq_thread threads[];
 };
 
-/** Background thread entry point. */
-static void *ioq_work(void *ptr) {
-       struct ioq *ioq = ptr;
+/** Cancel a request if we need to. */
+static bool ioq_check_cancel(struct ioq *ioq, struct ioq_ent *ent) {
+       if (!load(&ioq->cancel, relaxed)) {
+               return false;
+       }
 
-       while (true) {
-               struct ioq_ent *ent = ioqq_pop(ioq->pending);
-               if (ent == &IOQ_STOP) {
-                       break;
+       // Always close(), even if we're cancelled, just like a real EINTR
+       if (ent->op == IOQ_CLOSE || ent->op == IOQ_CLOSEDIR) {
+               return false;
+       }
+
+       ent->ret = -1;
+       ent->error = EINTR;
+       ioqq_push(ioq->ready, ent);
+       return true;
+}
+
+/** Handle a single request synchronously. */
+static void ioq_handle(struct ioq *ioq, struct ioq_ent *ent) {
+       int ret;
+
+       switch (ent->op) {
+       case IOQ_CLOSE:
+               ret = xclose(ent->close.fd);
+               break;
+
+       case IOQ_OPENDIR:
+               ret = bfs_opendir(ent->opendir.dir, ent->opendir.dfd, ent->opendir.path);
+               if (ret == 0) {
+                       bfs_polldir(ent->opendir.dir);
                }
+               break;
+
+       case IOQ_CLOSEDIR:
+               ret = bfs_closedir(ent->closedir.dir);
+               break;
+
+       default:
+               bfs_bug("Unknown ioq_op %d", (int)ent->op);
+               ret = -1;
+               errno = ENOSYS;
+               break;
+       }
+
+       ent->ret = ret;
+       ent->error = ret == 0 ? 0 : errno;
+
+       ioqq_push(ioq->ready, ent);
+}
 
-               bool cancel = load(&ioq->cancel, relaxed);
+#if BFS_USE_LIBURING
+/** io_uring worker state. */
+struct ioq_ring_state {
+       /** The I/O queue. */
+       struct ioq *ioq;
+       /** The io_uring. */
+       struct io_uring *ring;
+       /** The current ioq->pending slot. */
+       ioq_slot *slot;
+       /** Number of prepped, unsubmitted SQEs. */
+       size_t prepped;
+       /** Number of submitted, unreaped SQEs. */
+       size_t submitted;
+       /** Whether to stop the loop. */
+       bool stop;
+};
+
+/** Pop a request for ioq_ring_prep(). */
+static struct ioq_ent *ioq_ring_pop(struct ioq_ring_state *state) {
+       if (state->stop) {
+               return NULL;
+       }
+
+       // Advance to the next slot if necessary
+       struct ioq *ioq = state->ioq;
+       if (!state->slot) {
+               state->slot = ioqq_read(ioq->pending);
+       }
+
+       // Block if we have nothing else to do
+       bool block = !state->prepped && !state->submitted;
+       struct ioq_ent *ret = ioq_slot_pop(ioq->pending, state->slot, block);
+
+       if (ret) {
+               // Got an entry, move to the next slot next time
+               state->slot = NULL;
+       }
+
+       if (ret == &IOQ_STOP) {
+               state->stop = true;
+               ret = NULL;
+       }
+
+       return ret;
+}
 
-               ent->ret = -1;
+/** Prep a single SQE. */
+static void ioq_prep_sqe(struct io_uring_sqe *sqe, struct ioq_ent *ent) {
+       switch (ent->op) {
+       case IOQ_CLOSE:
+               io_uring_prep_close(sqe, ent->close.fd);
+               break;
+
+       case IOQ_OPENDIR:
+               io_uring_prep_openat(sqe, ent->opendir.dfd, ent->opendir.path, O_RDONLY | O_CLOEXEC | O_DIRECTORY, 0);
+               break;
+
+#if BFS_USE_UNWRAPDIR
+       case IOQ_CLOSEDIR:
+               io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir));
+               break;
+#endif
+
+       default:
+               bfs_bug("Unknown ioq_op %d", (int)ent->op);
+               io_uring_prep_nop(sqe);
+               break;
+       }
 
-               switch (ent->op) {
-               case IOQ_CLOSE:
-                       // Always close(), even if we're cancelled, just like a real EINTR
-                       ent->ret = xclose(ent->close.fd);
+       io_uring_sqe_set_data(sqe, ent);
+}
+
+/** Prep a batch of SQEs. */
+static bool ioq_ring_prep(struct ioq_ring_state *state) {
+       struct ioq *ioq = state->ioq;
+       struct io_uring *ring = state->ring;
+
+       while (io_uring_sq_space_left(ring)) {
+               struct ioq_ent *ent = ioq_ring_pop(state);
+               if (!ent) {
                        break;
+               }
+
+               if (ioq_check_cancel(ioq, ent)) {
+                       continue;
+               }
 
-               case IOQ_OPENDIR:
-                       if (!cancel) {
-                               struct ioq_opendir *args = &ent->opendir;
-                               ent->ret = bfs_opendir(args->dir, args->dfd, args->path);
-                               if (ent->ret == 0) {
-                                       bfs_polldir(args->dir);
-                               }
+#if !BFS_USE_UNWRAPDIR
+               if (ent->op == IOQ_CLOSEDIR) {
+                       ioq_handle(ioq, ent);
+                       continue;
+               }
+#endif
+
+               struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
+               ioq_prep_sqe(sqe, ent);
+               ++state->prepped;
+       }
+
+       return state->prepped || state->submitted;
+}
+
+/** Reap a batch of SQEs. */
+static void ioq_ring_reap(struct ioq_ring_state *state) {
+       struct ioq *ioq = state->ioq;
+       struct io_uring *ring = state->ring;
+
+       while (state->prepped) {
+               int ret = io_uring_submit_and_wait(ring, 1);
+               if (ret > 0) {
+                       state->prepped -= ret;
+                       state->submitted += ret;
+               }
+       }
+
+       while (state->submitted) {
+               struct io_uring_cqe *cqe;
+               if (io_uring_wait_cqe(ring, &cqe) < 0) {
+                       continue;
+               }
+
+               struct ioq_ent *ent = io_uring_cqe_get_data(cqe);
+               ent->ret = cqe->res >= 0 ? cqe->res : -1;
+               ent->error = cqe->res < 0 ? -cqe->res : 0;
+               io_uring_cqe_seen(ring, cqe);
+               --state->submitted;
+
+               if (ent->op == IOQ_OPENDIR && ent->ret >= 0) {
+                       int fd = ent->ret;
+                       if (ioq_check_cancel(ioq, ent)) {
+                               xclose(fd);
+                               continue;
                        }
-                       break;
 
-               case IOQ_CLOSEDIR:
-                       ent->ret = bfs_closedir(ent->closedir.dir);
-                       break;
+                       ent->ret = bfs_opendir(ent->opendir.dir, fd, NULL);
+                       if (ent->ret == 0) {
+                               // TODO: io_uring_prep_getdents()
+                               bfs_polldir(ent->opendir.dir);
+                       } else {
+                               ent->error = errno;
+                       }
+               }
+
+               ioqq_push(ioq->ready, ent);
+       }
+}
+
+/** io_uring worker loop. */
+static void ioq_ring_work(struct ioq_thread *thread) {
+       struct ioq_ring_state state = {
+               .ioq = thread->parent,
+               .ring = &thread->ring,
+       };
 
-               default:
-                       bfs_bug("Unknown ioq_op %d", (int)ent->op);
-                       errno = ENOSYS;
+       while (ioq_ring_prep(&state)) {
+               ioq_ring_reap(&state);
+       }
+}
+#endif
+
+/** Synchronous syscall loop. */
+static void ioq_sync_work(struct ioq_thread *thread) {
+       struct ioq *ioq = thread->parent;
+
+       while (true) {
+               struct ioq_ent *ent = ioqq_pop(ioq->pending);
+               if (ent == &IOQ_STOP) {
                        break;
                }
 
-               if (cancel) {
-                       ent->error = EINTR;
-               } else if (ent->ret < 0) {
-                       ent->error = errno;
-               } else {
-                       ent->error = 0;
+               if (!ioq_check_cancel(ioq, ent)) {
+                       ioq_handle(ioq, ent);
                }
+       }
+}
 
-               ioqq_push(ioq->ready, ent);
+/** Background thread entry point. */
+static void *ioq_work(void *ptr) {
+       struct ioq_thread *thread = ptr;
+
+#if BFS_USE_LIBURING
+       if (thread->ring_err == 0) {
+               ioq_ring_work(thread);
+               return NULL;
        }
+#endif
 
+       ioq_sync_work(thread);
        return NULL;
 }
 
@@ -376,7 +582,30 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
        }
 
        for (size_t i = 0; i < nthreads; ++i) {
-               if (thread_create(&ioq->threads[i], NULL, ioq_work, ioq) != 0) {
+               struct ioq_thread *thread = &ioq->threads[i];
+               thread->parent = ioq;
+
+#if BFS_USE_LIBURING
+               struct ioq_thread *prev = i ? &ioq->threads[i - 1] : NULL;
+               if (prev && prev->ring_err) {
+                       thread->ring_err = prev->ring_err;
+               } else {
+                       // Share io-wq workers between rings
+                       struct io_uring_params params = {0};
+                       if (prev) {
+                               params.flags |= IORING_SETUP_ATTACH_WQ;
+                               params.wq_fd = prev->ring.ring_fd;
+                       }
+
+                       size_t entries = depth / nthreads;
+                       if (entries < 16) {
+                               entries = 16;
+                       }
+                       thread->ring_err = -io_uring_queue_init_params(entries, &thread->ring, &params);
+               }
+#endif
+
+               if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) {
                        goto fail;
                }
                ++ioq->nthreads;
@@ -496,7 +725,11 @@ void ioq_destroy(struct ioq *ioq) {
        ioq_cancel(ioq);
 
        for (size_t i = 0; i < ioq->nthreads; ++i) {
-               thread_join(ioq->threads[i], NULL);
+               struct ioq_thread *thread = &ioq->threads[i];
+               thread_join(thread->id, NULL);
+#if BFS_USE_LIBURING
+               io_uring_queue_exit(&thread->ring);
+#endif
        }
 
        ioqq_destroy(ioq->ready);