Skip to content

rtio: Introduce OP_DELAY as a valid SQE operation #88808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion include/zephyr/rtio/rtio.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <zephyr/app_memory/app_memdomain.h>
#include <zephyr/device.h>
#include <zephyr/kernel.h>
#include <zephyr/kernel_structs.h>
#include <zephyr/sys/__assert.h>
#include <zephyr/sys/atomic.h>
#include <zephyr/sys/mem_blocks.h>
Expand Down Expand Up @@ -344,6 +345,12 @@ struct rtio_sqe {
uint8_t *rx_buf; /**< Buffer to read into */
} txrx;

/** OP_DELAY */
struct {
k_timeout_t timeout; /**< Delay timeout. */
struct _timeout to; /**< Timeout struct. Used internally. */
} delay;

/** OP_I2C_CONFIGURE */
uint32_t i2c_config;

Expand Down Expand Up @@ -555,8 +562,11 @@ struct rtio_iodev {
/** An operation that transceives (reads and writes simultaneously) */
#define RTIO_OP_TXRX (RTIO_OP_CALLBACK+1)

/** An operation that takes a specified amount of time (asynchronously) before completing */
#define RTIO_OP_DELAY (RTIO_OP_TXRX+1)

/** An operation to recover I2C buses */
#define RTIO_OP_I2C_RECOVER (RTIO_OP_TXRX+1)
#define RTIO_OP_I2C_RECOVER (RTIO_OP_DELAY+1)

/** An operation to configure I2C buses */
#define RTIO_OP_I2C_CONFIGURE (RTIO_OP_I2C_RECOVER+1)
Expand Down Expand Up @@ -747,6 +757,18 @@ static inline void rtio_sqe_prep_await(struct rtio_sqe *sqe,
sqe->userdata = userdata;
}

static inline void rtio_sqe_prep_delay(struct rtio_sqe *sqe,
k_timeout_t timeout,
void *userdata)
{
memset(sqe, 0, sizeof(struct rtio_sqe));
sqe->op = RTIO_OP_DELAY;
sqe->prio = 0;
sqe->iodev = NULL;
sqe->delay.timeout = timeout;
sqe->userdata = userdata;
}

static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *pool)
{
struct mpsc_node *node = mpsc_pop(&pool->free_q);
Expand Down
1 change: 1 addition & 0 deletions subsys/rtio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ if(CONFIG_RTIO)

zephyr_library_sources(rtio_executor.c)
zephyr_library_sources(rtio_init.c)
zephyr_library_sources(rtio_sched.c)
zephyr_library_sources_ifdef(CONFIG_USERSPACE rtio_handlers.c)
endif()

Expand Down
5 changes: 5 additions & 0 deletions subsys/rtio/rtio_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>

#include "rtio_sched.h"

#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL);

Expand All @@ -22,6 +24,9 @@ static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe)
sqe->callback.callback(iodev_sqe->r, sqe, sqe->callback.arg0);
rtio_iodev_sqe_ok(iodev_sqe, 0);
break;
case RTIO_OP_DELAY:
rtio_sched_alarm(iodev_sqe, sqe->delay.timeout);
break;
default:
rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
}
Expand Down
34 changes: 34 additions & 0 deletions subsys/rtio/rtio_sched.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2025 Croxel Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/

#include <zephyr/kernel.h>
#include <zephyr/rtio/rtio.h>

/** Required to access Timeout Queue APIs, which are used instead of the
* Timer APIs because of concerns on size on rtio_sqe (k_timer is more
* than double the size of _timeout). Users will have to instantiate a
* pool of SQE objects, thus its size directly impacts memory footprint
* of RTIO applications.
*/
#include <../kernel/include/timeout_q.h>

#include "rtio_sched.h"

static void rtio_sched_alarm_expired(struct _timeout *t)
{
struct rtio_sqe *sqe = CONTAINER_OF(t, struct rtio_sqe, delay.to);
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(sqe, struct rtio_iodev_sqe, sqe);

rtio_iodev_sqe_ok(iodev_sqe, 0);
}

void rtio_sched_alarm(struct rtio_iodev_sqe *iodev_sqe, k_timeout_t timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to pass the timeout as a param, just read the data directly

{
struct rtio_sqe *sqe = &iodev_sqe->sqe;

z_init_timeout(&sqe->delay.to);
z_add_timeout(&sqe->delay.to, rtio_sched_alarm_expired, timeout);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing the timeout if SQE is cancelled should be added somewhere :)

Copy link
Member Author

@ubieda ubieda Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this, it seems we don't need special handling for canceled OP_DELAY SQEs:

  • If it was canceled before the SQE is processed, it won't get to this point.
  • If it's canceled after the Timeout is scheduled, the timer expiration will not generate a CQE due to the item simply being canceled already.

I had added an additional test-case but it already passes with no code changes added:

ZTEST(rtio_api, test_rtio_delay_canceled)
{
	int res;
	struct rtio *r = &r_delay;
	struct rtio_sqe *sqe;
	struct rtio_cqe *cqe;

	sqe = rtio_sqe_acquire(r);
	zassert_not_null(sqe, "Expected a valid sqe");

	rtio_sqe_prep_delay(sqe, K_SECONDS(10), NULL);

	res = rtio_submit(r, 0);
	zassert_ok(res, "Should return ok from rtio_execute");

	rtio_sqe_cancel(sqe);
	k_sleep(K_SECONDS(10));

	cqe = rtio_cqe_consume(r);
	zassert_null(cqe, "Canceled SQEs should not generate a CQE");
	rtio_cqe_release(r, cqe);
}

}
14 changes: 14 additions & 0 deletions subsys/rtio/rtio_sched.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2025 Croxel Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/

#include <zephyr/kernel.h>

#ifndef ZEPHYR_SUBSYS_RTIO_SCHED_H_
#define ZEPHYR_SUBSYS_RTIO_SCHED_H_

void rtio_sched_alarm(struct rtio_iodev_sqe *iodev_sqe, k_timeout_t timeout);

#endif /* ZEPHYR_SUBSYS_RTIO_SCHED_H_ */
56 changes: 56 additions & 0 deletions tests/subsys/rtio/rtio_api/src/test_rtio_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,62 @@ ZTEST(rtio_api, test_rtio_cqe_count_overflow)
}
}

#define RTIO_DELAY_NUM_ELEMS 10

RTIO_DEFINE(r_delay, RTIO_DELAY_NUM_ELEMS, RTIO_DELAY_NUM_ELEMS);

ZTEST(rtio_api, test_rtio_delay)
{
int res;
struct rtio *r = &r_delay;
struct rtio_sqe *sqe;
struct rtio_cqe *cqe;

uint8_t expected_expiration_order[RTIO_DELAY_NUM_ELEMS] = {4, 3, 2, 1, 0, 5, 6, 7, 8, 9};

for (size_t i = 0; i < RTIO_DELAY_NUM_ELEMS; i++) {
sqe = rtio_sqe_acquire(r);
zassert_not_null(sqe, "Expected a valid sqe");

/** Half of the delays will be earlier than the previous one submitted.
* The other half will be later.
*/
if (i < (RTIO_DELAY_NUM_ELEMS / 2)) {
rtio_sqe_prep_delay(sqe, K_SECONDS(10 - i), (void *)i);
} else {
rtio_sqe_prep_delay(sqe, K_SECONDS(10 - 4 + i), (void *)i);
}
}

res = rtio_submit(r, 0);
zassert_ok(res, "Should return ok from rtio_execute");

cqe = rtio_cqe_consume(r);
zassert_is_null(cqe, "There should not be a cqe since delay has not expired");

/** Wait until we expect delays start expiring */
k_sleep(K_SECONDS(10 - (RTIO_DELAY_NUM_ELEMS / 2)));

for (int i = 0; i < RTIO_DELAY_NUM_ELEMS; i++) {
k_sleep(K_SECONDS(1));

TC_PRINT("consume %d\n", i);
cqe = rtio_cqe_consume(r);
zassert_not_null(cqe, "Expected a valid cqe");
zassert_ok(cqe->result, "Result should be ok");

size_t expired_id = (size_t)(cqe->userdata);

zassert_equal(expected_expiration_order[i], expired_id,
"Expected order not valid. Obtained: %d, expected: %d",
(int)expired_id, (int)expected_expiration_order[i]);

rtio_cqe_release(r, cqe);

cqe = rtio_cqe_consume(r);
zassert_is_null(cqe, "There should not be a cqe since next delay has not expired");
}
}

#define THROUGHPUT_ITERS 100000
RTIO_DEFINE(r_throughput, SQE_POOL_SIZE, CQE_POOL_SIZE);
Expand Down