diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index a19c8d704f4d..cc11706b1616 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -344,6 +345,12 @@ struct rtio_sqe { uint8_t *rx_buf; /**< Buffer to read into */ } txrx; + /** OP_DELAY */ + struct { + k_timeout_t timeout; /**< Delay timeout. */ + struct _timeout to; /**< Timeout struct. Used internally. */ + } delay; + /** OP_I2C_CONFIGURE */ uint32_t i2c_config; @@ -555,8 +562,11 @@ struct rtio_iodev { /** An operation that transceives (reads and writes simultaneously) */ #define RTIO_OP_TXRX (RTIO_OP_CALLBACK+1) +/** An operation that takes a specified amount of time (asynchronously) before completing */ +#define RTIO_OP_DELAY (RTIO_OP_TXRX+1) + /** An operation to recover I2C buses */ -#define RTIO_OP_I2C_RECOVER (RTIO_OP_TXRX+1) +#define RTIO_OP_I2C_RECOVER (RTIO_OP_DELAY+1) /** An operation to configure I2C buses */ #define RTIO_OP_I2C_CONFIGURE (RTIO_OP_I2C_RECOVER+1) @@ -747,6 +757,18 @@ static inline void rtio_sqe_prep_await(struct rtio_sqe *sqe, sqe->userdata = userdata; } +static inline void rtio_sqe_prep_delay(struct rtio_sqe *sqe, + k_timeout_t timeout, + void *userdata) +{ + memset(sqe, 0, sizeof(struct rtio_sqe)); + sqe->op = RTIO_OP_DELAY; + sqe->prio = 0; + sqe->iodev = NULL; + sqe->delay.timeout = timeout; + sqe->userdata = userdata; +} + static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *pool) { struct mpsc_node *node = mpsc_pop(&pool->free_q); diff --git a/subsys/rtio/CMakeLists.txt b/subsys/rtio/CMakeLists.txt index 421ca1cdba46..f75eaa7912af 100644 --- a/subsys/rtio/CMakeLists.txt +++ b/subsys/rtio/CMakeLists.txt @@ -10,6 +10,7 @@ if(CONFIG_RTIO) zephyr_library_sources(rtio_executor.c) zephyr_library_sources(rtio_init.c) + zephyr_library_sources(rtio_sched.c) zephyr_library_sources_ifdef(CONFIG_USERSPACE rtio_handlers.c) endif() diff --git a/subsys/rtio/rtio_executor.c b/subsys/rtio/rtio_executor.c index e4ad8fe516af..5fa0d548bc73 100644 --- a/subsys/rtio/rtio_executor.c +++ b/subsys/rtio/rtio_executor.c @@ -7,6 +7,8 @@ #include #include +#include "rtio_sched.h" + #include LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL); @@ -22,6 +24,9 @@ static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe) sqe->callback.callback(iodev_sqe->r, sqe, sqe->callback.arg0); rtio_iodev_sqe_ok(iodev_sqe, 0); break; + case RTIO_OP_DELAY: + rtio_sched_alarm(iodev_sqe, sqe->delay.timeout); + break; default: rtio_iodev_sqe_err(iodev_sqe, -EINVAL); } diff --git a/subsys/rtio/rtio_sched.c b/subsys/rtio/rtio_sched.c new file mode 100644 index 000000000000..e117b31d4589 --- /dev/null +++ b/subsys/rtio/rtio_sched.c @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2025 Croxel Inc. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +/** Required to access Timeout Queue APIs, which are used instead of the + * Timer APIs because of concerns on size on rtio_sqe (k_timer is more + * than double the size of _timeout). Users will have to instantiate a + * pool of SQE objects, thus its size directly impacts memory footprint + * of RTIO applications. + */ +#include <../kernel/include/timeout_q.h> + +#include "rtio_sched.h" + +static void rtio_sched_alarm_expired(struct _timeout *t) +{ + struct rtio_sqe *sqe = CONTAINER_OF(t, struct rtio_sqe, delay.to); + struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(sqe, struct rtio_iodev_sqe, sqe); + + rtio_iodev_sqe_ok(iodev_sqe, 0); +} + +void rtio_sched_alarm(struct rtio_iodev_sqe *iodev_sqe, k_timeout_t timeout) +{ + struct rtio_sqe *sqe = &iodev_sqe->sqe; + + z_init_timeout(&sqe->delay.to); + z_add_timeout(&sqe->delay.to, rtio_sched_alarm_expired, timeout); +} diff --git a/subsys/rtio/rtio_sched.h b/subsys/rtio/rtio_sched.h new file mode 100644 index 000000000000..1d5639caed76 --- /dev/null +++ b/subsys/rtio/rtio_sched.h @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2025 Croxel Inc. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#ifndef ZEPHYR_SUBSYS_RTIO_SCHED_H_ +#define ZEPHYR_SUBSYS_RTIO_SCHED_H_ + +void rtio_sched_alarm(struct rtio_iodev_sqe *iodev_sqe, k_timeout_t timeout); + +#endif /* ZEPHYR_SUBSYS_RTIO_SCHED_H_ */ diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c index f78c0b0e6424..b32868368fc8 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -649,6 +649,62 @@ ZTEST(rtio_api, test_rtio_cqe_count_overflow) } } +#define RTIO_DELAY_NUM_ELEMS 10 + +RTIO_DEFINE(r_delay, RTIO_DELAY_NUM_ELEMS, RTIO_DELAY_NUM_ELEMS); + +ZTEST(rtio_api, test_rtio_delay) +{ + int res; + struct rtio *r = &r_delay; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + uint8_t expected_expiration_order[RTIO_DELAY_NUM_ELEMS] = {4, 3, 2, 1, 0, 5, 6, 7, 8, 9}; + + for (size_t i = 0; i < RTIO_DELAY_NUM_ELEMS; i++) { + sqe = rtio_sqe_acquire(r); + zassert_not_null(sqe, "Expected a valid sqe"); + + /** Half of the delays will be earlier than the previous one submitted. + * The other half will be later. + */ + if (i < (RTIO_DELAY_NUM_ELEMS / 2)) { + rtio_sqe_prep_delay(sqe, K_SECONDS(10 - i), (void *)i); + } else { + rtio_sqe_prep_delay(sqe, K_SECONDS(10 - 4 + i), (void *)i); + } + } + + res = rtio_submit(r, 0); + zassert_ok(res, "Should return ok from rtio_execute"); + + cqe = rtio_cqe_consume(r); + zassert_is_null(cqe, "There should not be a cqe since delay has not expired"); + + /** Wait until we expect delays start expiring */ + k_sleep(K_SECONDS(10 - (RTIO_DELAY_NUM_ELEMS / 2))); + + for (int i = 0; i < RTIO_DELAY_NUM_ELEMS; i++) { + k_sleep(K_SECONDS(1)); + + TC_PRINT("consume %d\n", i); + cqe = rtio_cqe_consume(r); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + + size_t expired_id = (size_t)(cqe->userdata); + + zassert_equal(expected_expiration_order[i], expired_id, + "Expected order not valid. Obtained: %d, expected: %d", + (int)expired_id, (int)expected_expiration_order[i]); + + rtio_cqe_release(r, cqe); + + cqe = rtio_cqe_consume(r); + zassert_is_null(cqe, "There should not be a cqe since next delay has not expired"); + } +} #define THROUGHPUT_ITERS 100000 RTIO_DEFINE(r_throughput, SQE_POOL_SIZE, CQE_POOL_SIZE);