From e7506b052613bd5a2e1e374690b6caa510262c64 Mon Sep 17 00:00:00 2001 From: Luis Ubieda Date: Fri, 18 Apr 2025 17:19:11 -0400 Subject: [PATCH 1/2] rtio: Introduce OP_DELAY as a valid SQE operation SQE items with this OP will take the specified amount of time (asynchronously) before completing. This allows to serve as an asynchronous delay in between SQE items (e.g: A sensor measurement requested, which requires 50-ms before having the result available). Signed-off-by: Luis Ubieda --- include/zephyr/rtio/rtio.h | 24 +++++++++++++++++++++++- subsys/rtio/CMakeLists.txt | 1 + subsys/rtio/rtio_executor.c | 5 +++++ subsys/rtio/rtio_sched.c | 34 ++++++++++++++++++++++++++++++++++ subsys/rtio/rtio_sched.h | 14 ++++++++++++++ 5 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 subsys/rtio/rtio_sched.c create mode 100644 subsys/rtio/rtio_sched.h diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index a19c8d704f4d..cc11706b1616 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -344,6 +345,12 @@ struct rtio_sqe { uint8_t *rx_buf; /**< Buffer to read into */ } txrx; + /** OP_DELAY */ + struct { + k_timeout_t timeout; /**< Delay timeout. */ + struct _timeout to; /**< Timeout struct. Used internally. */ + } delay; + /** OP_I2C_CONFIGURE */ uint32_t i2c_config; @@ -555,8 +562,11 @@ struct rtio_iodev { /** An operation that transceives (reads and writes simultaneously) */ #define RTIO_OP_TXRX (RTIO_OP_CALLBACK+1) +/** An operation that takes a specified amount of time (asynchronously) before completing */ +#define RTIO_OP_DELAY (RTIO_OP_TXRX+1) + /** An operation to recover I2C buses */ -#define RTIO_OP_I2C_RECOVER (RTIO_OP_TXRX+1) +#define RTIO_OP_I2C_RECOVER (RTIO_OP_DELAY+1) /** An operation to configure I2C buses */ #define RTIO_OP_I2C_CONFIGURE (RTIO_OP_I2C_RECOVER+1) @@ -747,6 +757,18 @@ static inline void rtio_sqe_prep_await(struct rtio_sqe *sqe, sqe->userdata = userdata; } +static inline void rtio_sqe_prep_delay(struct rtio_sqe *sqe, + k_timeout_t timeout, + void *userdata) +{ + memset(sqe, 0, sizeof(struct rtio_sqe)); + sqe->op = RTIO_OP_DELAY; + sqe->prio = 0; + sqe->iodev = NULL; + sqe->delay.timeout = timeout; + sqe->userdata = userdata; +} + static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *pool) { struct mpsc_node *node = mpsc_pop(&pool->free_q); diff --git a/subsys/rtio/CMakeLists.txt b/subsys/rtio/CMakeLists.txt index 421ca1cdba46..f75eaa7912af 100644 --- a/subsys/rtio/CMakeLists.txt +++ b/subsys/rtio/CMakeLists.txt @@ -10,6 +10,7 @@ if(CONFIG_RTIO) zephyr_library_sources(rtio_executor.c) zephyr_library_sources(rtio_init.c) + zephyr_library_sources(rtio_sched.c) zephyr_library_sources_ifdef(CONFIG_USERSPACE rtio_handlers.c) endif() diff --git a/subsys/rtio/rtio_executor.c b/subsys/rtio/rtio_executor.c index e4ad8fe516af..5fa0d548bc73 100644 --- a/subsys/rtio/rtio_executor.c +++ b/subsys/rtio/rtio_executor.c @@ -7,6 +7,8 @@ #include #include +#include "rtio_sched.h" + #include LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL); @@ -22,6 +24,9 @@ static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe) sqe->callback.callback(iodev_sqe->r, sqe, sqe->callback.arg0); rtio_iodev_sqe_ok(iodev_sqe, 0); break; + case RTIO_OP_DELAY: + rtio_sched_alarm(iodev_sqe, sqe->delay.timeout); + break; default: rtio_iodev_sqe_err(iodev_sqe, -EINVAL); } diff --git a/subsys/rtio/rtio_sched.c b/subsys/rtio/rtio_sched.c new file mode 100644 index 000000000000..e117b31d4589 --- /dev/null +++ b/subsys/rtio/rtio_sched.c @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2025 Croxel Inc. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +/** Required to access Timeout Queue APIs, which are used instead of the + * Timer APIs because of concerns on size on rtio_sqe (k_timer is more + * than double the size of _timeout). Users will have to instantiate a + * pool of SQE objects, thus its size directly impacts memory footprint + * of RTIO applications. + */ +#include <../kernel/include/timeout_q.h> + +#include "rtio_sched.h" + +static void rtio_sched_alarm_expired(struct _timeout *t) +{ + struct rtio_sqe *sqe = CONTAINER_OF(t, struct rtio_sqe, delay.to); + struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(sqe, struct rtio_iodev_sqe, sqe); + + rtio_iodev_sqe_ok(iodev_sqe, 0); +} + +void rtio_sched_alarm(struct rtio_iodev_sqe *iodev_sqe, k_timeout_t timeout) +{ + struct rtio_sqe *sqe = &iodev_sqe->sqe; + + z_init_timeout(&sqe->delay.to); + z_add_timeout(&sqe->delay.to, rtio_sched_alarm_expired, timeout); +} diff --git a/subsys/rtio/rtio_sched.h b/subsys/rtio/rtio_sched.h new file mode 100644 index 000000000000..1d5639caed76 --- /dev/null +++ b/subsys/rtio/rtio_sched.h @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2025 Croxel Inc. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#ifndef ZEPHYR_SUBSYS_RTIO_SCHED_H_ +#define ZEPHYR_SUBSYS_RTIO_SCHED_H_ + +void rtio_sched_alarm(struct rtio_iodev_sqe *iodev_sqe, k_timeout_t timeout); + +#endif /* ZEPHYR_SUBSYS_RTIO_SCHED_H_ */ From 92d803749d335a6f1cdd0b2d2a68c6ec0302993e Mon Sep 17 00:00:00 2001 From: Luis Ubieda Date: Fri, 18 Apr 2025 17:21:20 -0400 Subject: [PATCH 2/2] tests: rtio: Add testcase to validate OP_DELAY Providing a set of SQEs with varying delays in different orders. Test validates each delay is completed at the right amount (and the CQEs are received in the right order). Signed-off-by: Luis Ubieda --- .../subsys/rtio/rtio_api/src/test_rtio_api.c | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c index f78c0b0e6424..b32868368fc8 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -649,6 +649,62 @@ ZTEST(rtio_api, test_rtio_cqe_count_overflow) } } +#define RTIO_DELAY_NUM_ELEMS 10 + +RTIO_DEFINE(r_delay, RTIO_DELAY_NUM_ELEMS, RTIO_DELAY_NUM_ELEMS); + +ZTEST(rtio_api, test_rtio_delay) +{ + int res; + struct rtio *r = &r_delay; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + uint8_t expected_expiration_order[RTIO_DELAY_NUM_ELEMS] = {4, 3, 2, 1, 0, 5, 6, 7, 8, 9}; + + for (size_t i = 0; i < RTIO_DELAY_NUM_ELEMS; i++) { + sqe = rtio_sqe_acquire(r); + zassert_not_null(sqe, "Expected a valid sqe"); + + /** Half of the delays will be earlier than the previous one submitted. + * The other half will be later. + */ + if (i < (RTIO_DELAY_NUM_ELEMS / 2)) { + rtio_sqe_prep_delay(sqe, K_SECONDS(10 - i), (void *)i); + } else { + rtio_sqe_prep_delay(sqe, K_SECONDS(10 - 4 + i), (void *)i); + } + } + + res = rtio_submit(r, 0); + zassert_ok(res, "Should return ok from rtio_execute"); + + cqe = rtio_cqe_consume(r); + zassert_is_null(cqe, "There should not be a cqe since delay has not expired"); + + /** Wait until we expect delays start expiring */ + k_sleep(K_SECONDS(10 - (RTIO_DELAY_NUM_ELEMS / 2))); + + for (int i = 0; i < RTIO_DELAY_NUM_ELEMS; i++) { + k_sleep(K_SECONDS(1)); + + TC_PRINT("consume %d\n", i); + cqe = rtio_cqe_consume(r); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + + size_t expired_id = (size_t)(cqe->userdata); + + zassert_equal(expected_expiration_order[i], expired_id, + "Expected order not valid. Obtained: %d, expected: %d", + (int)expired_id, (int)expected_expiration_order[i]); + + rtio_cqe_release(r, cqe); + + cqe = rtio_cqe_consume(r); + zassert_is_null(cqe, "There should not be a cqe since next delay has not expired"); + } +} #define THROUGHPUT_ITERS 100000 RTIO_DEFINE(r_throughput, SQE_POOL_SIZE, CQE_POOL_SIZE);