From 58bde443169843799d7d4f622dae3602b13548df Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Thu, 30 Jan 2025 22:11:40 +1000 Subject: [PATCH 1/4] zbus: new observer type, `ZBUS_OBSERVER_WAITER_TYPE` Add a new type of observer that simply gives a semaphore when a channel is published to. This can simplify code that wants to deal with zbus channel data sequentially. Signed-off-by: Jordan Yates --- include/zephyr/zbus/zbus.h | 41 ++++++++++++++++++++++++++++++++++++++ subsys/zbus/zbus.c | 4 ++++ 2 files changed, 45 insertions(+) diff --git a/include/zephyr/zbus/zbus.h b/include/zephyr/zbus/zbus.h index c13e77082e73..47d28ac73629 100644 --- a/include/zephyr/zbus/zbus.h +++ b/include/zephyr/zbus/zbus.h @@ -118,6 +118,7 @@ enum __packed zbus_observer_type { ZBUS_OBSERVER_LISTENER_TYPE, ZBUS_OBSERVER_SUBSCRIBER_TYPE, ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, + ZBUS_OBSERVER_WAITER_TYPE, }; struct zbus_observer_data { @@ -163,6 +164,9 @@ struct zbus_observer { /** Observer callback function. It turns the observer into a listener. */ void (*callback)(const struct zbus_channel *chan); + /** Observer semaphore. It turns the observer into a waiter. */ + struct k_sem *sem; + #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) || defined(__DOXYGEN__) /** Observer message FIFO. It turns the observer into a message subscriber. It only * exists if the @kconfig{CONFIG_ZBUS_MSG_SUBSCRIBER} is enabled. @@ -512,6 +516,43 @@ struct zbus_channel_observation { /* clang-format off */ +/** + * @brief Define and initialize a waiter. + * + * This macro defines an observer of waiter type. + * + * @param[in] _name The waiter's name. + * @param[in] _sem The semaphore given on channel publish. + * @param[in] _enable The waiter initial enable state. + */ +#define ZBUS_WAITER_DEFINE_WITH_ENABLE(_name, _sem, _enable) \ + static struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \ + .enabled = _enable, \ + IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \ + .priority = ZBUS_MIN_THREAD_PRIORITY, \ + )) \ + }; \ + const STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ + ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ + .type = ZBUS_OBSERVER_WAITER_TYPE, \ + .data = &_CONCAT(_zbus_obs_data_, _name), \ + .sem = (_sem) \ + } +/* clang-format on */ + +/** + * @brief Define and initialize a waiter. + * + * This macro defines an observer of waiter type. The waiter is defined in the enabled state + * with this macro. + * + * @param[in] _name The waiter's name. + * @param[in] _sem The semaphore given on channel publish. + */ +#define ZBUS_WAITER_DEFINE(_name, _sem) ZBUS_WAITER_DEFINE_WITH_ENABLE(_name, _sem, true) + +/* clang-format off */ + /** * @brief Define and initialize a message subscriber. * diff --git a/subsys/zbus/zbus.c b/subsys/zbus/zbus.c index 0fff534c2481..a37db89d71c3 100644 --- a/subsys/zbus/zbus.c +++ b/subsys/zbus/zbus.c @@ -134,6 +134,10 @@ static inline int _zbus_notify_observer(const struct zbus_channel *chan, case ZBUS_OBSERVER_SUBSCRIBER_TYPE: { return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time)); } + case ZBUS_OBSERVER_WAITER_TYPE: { + k_sem_give(obs->sem); + break; + } #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: { struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time)); From 3d70457db65dea32d6bff1f59a57fb1360766213 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Fri, 31 Jan 2025 12:13:29 +1000 Subject: [PATCH 2/4] zbus: helper macro for stack waiter observer Add a helper macro to declare a waiter observer on the stack for use with `zbus_chan_add_obs`. Signed-off-by: Jordan Yates --- include/zephyr/zbus/zbus.h | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/include/zephyr/zbus/zbus.h b/include/zephyr/zbus/zbus.h index 47d28ac73629..ca1e108229ca 100644 --- a/include/zephyr/zbus/zbus.h +++ b/include/zephyr/zbus/zbus.h @@ -538,6 +538,30 @@ struct zbus_channel_observation { .data = &_CONCAT(_zbus_obs_data_, _name), \ .sem = (_sem) \ } + +/** + * @brief Define and initialize a waiter on the stack. + * + * This macro defines an observer of waiter type that can exist on the stack and attached to + * a channel with @ref zbus_chan_add_obs. + * + * @param[in] _name The waiter's name. + * @param[in] _sem The semaphore given on channel publish. + */ +#define ZBUS_RUNTIME_WAITER_DEFINE(_name, _sem) \ + struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \ + .enabled = true, \ + IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \ + .priority = ZBUS_MIN_THREAD_PRIORITY, \ + )) \ + }; \ + const struct zbus_observer _name = { \ + ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ + .type = ZBUS_OBSERVER_WAITER_TYPE, \ + .data = &_CONCAT(_zbus_obs_data_, _name), \ + .sem = (_sem) \ + } + /* clang-format on */ /** From 7501e9f87aef6984884a16920a8b51d9045194f5 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Fri, 31 Jan 2025 09:59:59 +1000 Subject: [PATCH 3/4] tests: zbus: unittests: test `ZBUS_OBSERVER_WAITER_TYPE` Add tests for observers of type `ZBUS_OBSERVER_WAITER_TYPE`. Signed-off-by: Jordan Yates --- tests/subsys/zbus/unittests/src/main.c | 68 ++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/tests/subsys/zbus/unittests/src/main.c b/tests/subsys/zbus/unittests/src/main.c index a40a70082f1f..d39e18c4ca8f 100644 --- a/tests/subsys/zbus/unittests/src/main.c +++ b/tests/subsys/zbus/unittests/src/main.c @@ -86,6 +86,14 @@ ZBUS_CHAN_DEFINE(stuck_chan, /* Name */ ZBUS_MSG_INIT(0) /* Initial value major 0, minor 1, build 1023 */ ); +ZBUS_CHAN_DEFINE(msg_obs_sem, /* Name */ + int, /* Message type */ + NULL, /* Validator */ + NULL, /* User data */ + ZBUS_OBSERVERS(waiter1, waiter2), /* observers */ + ZBUS_MSG_INIT(0) /* Initial value 0 */ +); + ZBUS_CHAN_DEFINE(msg_sub_fail_chan, /* Name */ int, /* Message type */ NULL, /* Validator */ @@ -225,6 +233,11 @@ ZBUS_SUBSCRIBER_DEFINE(sub1, 1); ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(foo_msg_sub, false); ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(foo2_msg_sub, false); +static K_SEM_DEFINE(sem1, 0, 2); +static K_SEM_DEFINE(sem2, 0, 1); +ZBUS_WAITER_DEFINE(waiter1, &sem1); +ZBUS_WAITER_DEFINE_WITH_ENABLE(waiter2, &sem2, false); + static K_FIFO_DEFINE(_zbus_observer_fifo_invalid_obs); /* clang-format off */ @@ -413,17 +426,21 @@ static bool check_chan_iterator(const struct zbus_channel *chan, void *user_data zassert_mem_equal__(zbus_chan_name(chan), "hard_chan", 9, "Must be equal"); break; case 5: + zassert_mem_equal__(zbus_chan_name(chan), "msg_obs_sem", sizeof("msg_obs_sem"), + "Must be equal"); + break; + case 6: zassert_mem_equal__(zbus_chan_name(chan), "msg_sub_fail_chan", sizeof("msg_sub_fail_chan"), "Must be equal"); break; - case 6: + case 7: zassert_mem_equal__(zbus_chan_name(chan), "msg_sub_no_pool_chan", sizeof("msg_sub_no_pool_chan"), "Must be equal"); break; - case 7: + case 8: zassert_mem_equal__(zbus_chan_name(chan), "stuck_chan", 10, "Must be equal"); break; - case 8: + case 9: zassert_mem_equal__(zbus_chan_name(chan), "version_chan", 12, "Must be equal"); break; default: @@ -475,6 +492,12 @@ static bool check_obs_iterator(const struct zbus_observer *obs, void *user_data) case 9: zassert_mem_equal__(zbus_obs_name(obs), "sub1", 4, "Must be equal"); break; + case 10: + zassert_mem_equal__(zbus_obs_name(obs), "waiter1", 7, "Must be equal"); + break; + case 11: + zassert_mem_equal__(zbus_obs_name(obs), "waiter2", 7, "Must be equal"); + break; default: zassert_unreachable(NULL); } @@ -789,6 +812,45 @@ ZTEST(basic, test_specification_based__zbus_sub_wait_msg) irq_offload(isr_sub_wait_msg, NULL); } +ZTEST(basic, test_specification_based__zbus_obs_waiter) +{ + int data = 0; + + /* Semaphores not given by default */ + zassert_equal(-EBUSY, k_sem_take(&sem1, K_NO_WAIT)); + zassert_equal(-EBUSY, k_sem_take(&sem2, K_NO_WAIT)); + + /* Publish to channel */ + zassert_equal(0, zbus_chan_pub(&msg_obs_sem, &data, K_FOREVER)); + + /* First semaphore available, second waiter is not enabled */ + zassert_equal(0, k_sem_take(&sem1, K_NO_WAIT)); + zassert_equal(-EBUSY, k_sem_take(&sem2, K_NO_WAIT)); + + /* Enable second waiter */ + zbus_obs_set_enable(&waiter2, true); + + /* Publish again to channel */ + zassert_equal(0, zbus_chan_pub(&msg_obs_sem, &data, K_FOREVER)); + + /* Both semaphores given */ + zassert_equal(0, k_sem_take(&sem1, K_NO_WAIT)); + zassert_equal(0, k_sem_take(&sem2, K_NO_WAIT)); + zassert_equal(-EBUSY, k_sem_take(&sem1, K_NO_WAIT)); + zassert_equal(-EBUSY, k_sem_take(&sem2, K_NO_WAIT)); + + /* Publish twice */ + zassert_equal(0, zbus_chan_pub(&msg_obs_sem, &data, K_FOREVER)); + zassert_equal(0, zbus_chan_pub(&msg_obs_sem, &data, K_FOREVER)); + + /* First semaphore should have 2 takes available due to its max_count value */ + zassert_equal(0, k_sem_take(&sem1, K_NO_WAIT)); + zassert_equal(0, k_sem_take(&sem1, K_NO_WAIT)); + zassert_equal(0, k_sem_take(&sem2, K_NO_WAIT)); + zassert_equal(-EBUSY, k_sem_take(&sem1, K_NO_WAIT)); + zassert_equal(-EBUSY, k_sem_take(&sem2, K_NO_WAIT)); +} + #if defined(CONFIG_ZBUS_PRIORITY_BOOST) static void isr_obs_attach_detach(const void *operation) { From 361d0eb161fd700821a4ca1b28e98935ab775ca6 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Fri, 31 Jan 2025 12:14:23 +1000 Subject: [PATCH 4/4] tests: zbus: test `ZBUS_RUNTIME_WAITER_DEFINE` Validate that an observer created with `ZBUS_RUNTIME_WAITER_DEFINE` operates correctly. Signed-off-by: Jordan Yates --- .../runtime_observers_registration/src/main.c | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/subsys/zbus/runtime_observers_registration/src/main.c b/tests/subsys/zbus/runtime_observers_registration/src/main.c index b72586aff27f..9f828b095237 100644 --- a/tests/subsys/zbus/runtime_observers_registration/src/main.c +++ b/tests/subsys/zbus/runtime_observers_registration/src/main.c @@ -126,6 +126,50 @@ ZTEST(basic, test_specification_based__zbus_obs_add_rm_obs) zassert_equal(0, zbus_chan_rm_obs(&chan2, &sub2, K_MSEC(200)), NULL); } +static void chan1_publisher(struct k_work *work) +{ + struct k_work_delayable *dwork = k_work_delayable_from_work(work); + struct sensor_data_msg sd = {.a = 10, .b = 100}; + + zassert_equal(0, zbus_chan_pub(&chan1, &sd, K_MSEC(5))); + + k_work_reschedule(dwork, K_MSEC(100)); +} + +ZTEST(basic, test_specification_based__zbus_obs_stack_waiter) +{ + static struct zbus_observer_node node; + struct k_work_delayable publisher; + struct k_work_sync sync; + struct k_sem pub_sem; + + ZBUS_RUNTIME_WAITER_DEFINE(waiter, &pub_sem); + + /* Start the channel publisher */ + k_work_init_delayable(&publisher, chan1_publisher); + k_work_schedule(&publisher, K_NO_WAIT); + k_sleep(K_MSEC(2)); + + /* Setup semaphore and add waiter to channel */ + zassert_equal(0, k_sem_init(&pub_sem, 0, 1)); + zassert_equal(0, zbus_chan_add_obs(&chan1, &waiter, &node, K_MSEC(10)), NULL); + + /* Wait for channel to be published multiple times */ + for (int i = 0; i < 5; i++) { + zassert_equal(-EAGAIN, k_sem_take(&pub_sem, K_MSEC(80))); + zassert_equal(0, k_sem_take(&pub_sem, K_MSEC(30))); + } + + /* Cleanup the waiter */ + zassert_equal(0, zbus_chan_rm_obs(&chan1, &waiter, K_MSEC(10))); + + /* No more semaphore handling */ + zassert_equal(-EAGAIN, k_sem_take(&pub_sem, K_MSEC(120))); + + /* Cancel the channel publisher */ + zassert_true(k_work_cancel_delayable_sync(&publisher, &sync)); +} + struct aux2_wq_data { struct k_work work; };