Skip to content

zbus: ZBUS_WAITER_DEFINE #88690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions include/zephyr/zbus/zbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ enum __packed zbus_observer_type {
ZBUS_OBSERVER_LISTENER_TYPE,
ZBUS_OBSERVER_SUBSCRIBER_TYPE,
ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
ZBUS_OBSERVER_WAITER_TYPE,
};

struct zbus_observer_data {
Expand Down Expand Up @@ -163,6 +164,9 @@ struct zbus_observer {
/** Observer callback function. It turns the observer into a listener. */
void (*callback)(const struct zbus_channel *chan);

/** Observer semaphore. It turns the observer into a waiter. */
struct k_sem *sem;

#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) || defined(__DOXYGEN__)
/** Observer message FIFO. It turns the observer into a message subscriber. It only
* exists if the @kconfig{CONFIG_ZBUS_MSG_SUBSCRIBER} is enabled.
Expand Down Expand Up @@ -512,6 +516,67 @@ struct zbus_channel_observation {

/* clang-format off */

/**
* @brief Define and initialize a waiter.
*
* This macro defines an observer of waiter type.
*
* @param[in] _name The waiter's name.
* @param[in] _sem The semaphore given on channel publish.
* @param[in] _enable The waiter initial enable state.
*/
#define ZBUS_WAITER_DEFINE_WITH_ENABLE(_name, _sem, _enable) \
static struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \
.enabled = _enable, \
IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \
.priority = ZBUS_MIN_THREAD_PRIORITY, \
)) \
}; \
const STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \
ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.type = ZBUS_OBSERVER_WAITER_TYPE, \
.data = &_CONCAT(_zbus_obs_data_, _name), \
.sem = (_sem) \
}

/**
* @brief Define and initialize a waiter on the stack.
*
* This macro defines an observer of waiter type that can exist on the stack and attached to
* a channel with @ref zbus_chan_add_obs.
*
* @param[in] _name The waiter's name.
* @param[in] _sem The semaphore given on channel publish.
*/
#define ZBUS_RUNTIME_WAITER_DEFINE(_name, _sem) \
struct zbus_observer_data _CONCAT(_zbus_obs_data_, _name) = { \
.enabled = true, \
IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, ( \
.priority = ZBUS_MIN_THREAD_PRIORITY, \
)) \
}; \
const struct zbus_observer _name = { \
ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.type = ZBUS_OBSERVER_WAITER_TYPE, \
.data = &_CONCAT(_zbus_obs_data_, _name), \
.sem = (_sem) \
}

/* clang-format on */

/**
* @brief Define and initialize a waiter.
*
* This macro defines an observer of waiter type. The waiter is defined in the enabled state
* with this macro.
*
* @param[in] _name The waiter's name.
* @param[in] _sem The semaphore given on channel publish.
*/
#define ZBUS_WAITER_DEFINE(_name, _sem) ZBUS_WAITER_DEFINE_WITH_ENABLE(_name, _sem, true)

/* clang-format off */

/**
* @brief Define and initialize a message subscriber.
*
Expand Down
4 changes: 4 additions & 0 deletions subsys/zbus/zbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ static inline int _zbus_notify_observer(const struct zbus_channel *chan,
case ZBUS_OBSERVER_SUBSCRIBER_TYPE: {
return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
}
case ZBUS_OBSERVER_WAITER_TYPE: {
k_sem_give(obs->sem);
break;
}
#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: {
struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time));
Expand Down
44 changes: 44 additions & 0 deletions tests/subsys/zbus/runtime_observers_registration/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,50 @@ ZTEST(basic, test_specification_based__zbus_obs_add_rm_obs)
zassert_equal(0, zbus_chan_rm_obs(&chan2, &sub2, K_MSEC(200)), NULL);
}

static void chan1_publisher(struct k_work *work)
{
struct k_work_delayable *dwork = k_work_delayable_from_work(work);
struct sensor_data_msg sd = {.a = 10, .b = 100};

zassert_equal(0, zbus_chan_pub(&chan1, &sd, K_MSEC(5)));

k_work_reschedule(dwork, K_MSEC(100));
}

ZTEST(basic, test_specification_based__zbus_obs_stack_waiter)
{
static struct zbus_observer_node node;
struct k_work_delayable publisher;
struct k_work_sync sync;
struct k_sem pub_sem;

ZBUS_RUNTIME_WAITER_DEFINE(waiter, &pub_sem);

/* Start the channel publisher */
k_work_init_delayable(&publisher, chan1_publisher);
k_work_schedule(&publisher, K_NO_WAIT);
k_sleep(K_MSEC(2));

/* Setup semaphore and add waiter to channel */
zassert_equal(0, k_sem_init(&pub_sem, 0, 1));
zassert_equal(0, zbus_chan_add_obs(&chan1, &waiter, &node, K_MSEC(10)), NULL);

/* Wait for channel to be published multiple times */
for (int i = 0; i < 5; i++) {
zassert_equal(-EAGAIN, k_sem_take(&pub_sem, K_MSEC(80)));
zassert_equal(0, k_sem_take(&pub_sem, K_MSEC(30)));
}

/* Cleanup the waiter */
zassert_equal(0, zbus_chan_rm_obs(&chan1, &waiter, K_MSEC(10)));

/* No more semaphore handling */
zassert_equal(-EAGAIN, k_sem_take(&pub_sem, K_MSEC(120)));

/* Cancel the channel publisher */
zassert_true(k_work_cancel_delayable_sync(&publisher, &sync));
}

struct aux2_wq_data {
struct k_work work;
};
Expand Down
68 changes: 65 additions & 3 deletions tests/subsys/zbus/unittests/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ ZBUS_CHAN_DEFINE(stuck_chan, /* Name */
ZBUS_MSG_INIT(0) /* Initial value major 0, minor 1, build 1023 */
);

ZBUS_CHAN_DEFINE(msg_obs_sem, /* Name */
int, /* Message type */
NULL, /* Validator */
NULL, /* User data */
ZBUS_OBSERVERS(waiter1, waiter2), /* observers */
ZBUS_MSG_INIT(0) /* Initial value 0 */
);

ZBUS_CHAN_DEFINE(msg_sub_fail_chan, /* Name */
int, /* Message type */
NULL, /* Validator */
Expand Down Expand Up @@ -225,6 +233,11 @@ ZBUS_SUBSCRIBER_DEFINE(sub1, 1);
ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(foo_msg_sub, false);
ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(foo2_msg_sub, false);

static K_SEM_DEFINE(sem1, 0, 2);
static K_SEM_DEFINE(sem2, 0, 1);
ZBUS_WAITER_DEFINE(waiter1, &sem1);
ZBUS_WAITER_DEFINE_WITH_ENABLE(waiter2, &sem2, false);

static K_FIFO_DEFINE(_zbus_observer_fifo_invalid_obs);

/* clang-format off */
Expand Down Expand Up @@ -413,17 +426,21 @@ static bool check_chan_iterator(const struct zbus_channel *chan, void *user_data
zassert_mem_equal__(zbus_chan_name(chan), "hard_chan", 9, "Must be equal");
break;
case 5:
zassert_mem_equal__(zbus_chan_name(chan), "msg_obs_sem", sizeof("msg_obs_sem"),
"Must be equal");
break;
case 6:
zassert_mem_equal__(zbus_chan_name(chan), "msg_sub_fail_chan",
sizeof("msg_sub_fail_chan"), "Must be equal");
break;
case 6:
case 7:
zassert_mem_equal__(zbus_chan_name(chan), "msg_sub_no_pool_chan",
sizeof("msg_sub_no_pool_chan"), "Must be equal");
break;
case 7:
case 8:
zassert_mem_equal__(zbus_chan_name(chan), "stuck_chan", 10, "Must be equal");
break;
case 8:
case 9:
zassert_mem_equal__(zbus_chan_name(chan), "version_chan", 12, "Must be equal");
break;
default:
Expand Down Expand Up @@ -475,6 +492,12 @@ static bool check_obs_iterator(const struct zbus_observer *obs, void *user_data)
case 9:
zassert_mem_equal__(zbus_obs_name(obs), "sub1", 4, "Must be equal");
break;
case 10:
zassert_mem_equal__(zbus_obs_name(obs), "waiter1", 7, "Must be equal");
break;
case 11:
zassert_mem_equal__(zbus_obs_name(obs), "waiter2", 7, "Must be equal");
break;
default:
zassert_unreachable(NULL);
}
Expand Down Expand Up @@ -789,6 +812,45 @@ ZTEST(basic, test_specification_based__zbus_sub_wait_msg)
irq_offload(isr_sub_wait_msg, NULL);
}

ZTEST(basic, test_specification_based__zbus_obs_waiter)
{
int data = 0;

/* Semaphores not given by default */
zassert_equal(-EBUSY, k_sem_take(&sem1, K_NO_WAIT));
zassert_equal(-EBUSY, k_sem_take(&sem2, K_NO_WAIT));

/* Publish to channel */
zassert_equal(0, zbus_chan_pub(&msg_obs_sem, &data, K_FOREVER));

/* First semaphore available, second waiter is not enabled */
zassert_equal(0, k_sem_take(&sem1, K_NO_WAIT));
zassert_equal(-EBUSY, k_sem_take(&sem2, K_NO_WAIT));

/* Enable second waiter */
zbus_obs_set_enable(&waiter2, true);

/* Publish again to channel */
zassert_equal(0, zbus_chan_pub(&msg_obs_sem, &data, K_FOREVER));

/* Both semaphores given */
zassert_equal(0, k_sem_take(&sem1, K_NO_WAIT));
zassert_equal(0, k_sem_take(&sem2, K_NO_WAIT));
zassert_equal(-EBUSY, k_sem_take(&sem1, K_NO_WAIT));
zassert_equal(-EBUSY, k_sem_take(&sem2, K_NO_WAIT));

/* Publish twice */
zassert_equal(0, zbus_chan_pub(&msg_obs_sem, &data, K_FOREVER));
zassert_equal(0, zbus_chan_pub(&msg_obs_sem, &data, K_FOREVER));

/* First semaphore should have 2 takes available due to its max_count value */
zassert_equal(0, k_sem_take(&sem1, K_NO_WAIT));
zassert_equal(0, k_sem_take(&sem1, K_NO_WAIT));
zassert_equal(0, k_sem_take(&sem2, K_NO_WAIT));
zassert_equal(-EBUSY, k_sem_take(&sem1, K_NO_WAIT));
zassert_equal(-EBUSY, k_sem_take(&sem2, K_NO_WAIT));
}

#if defined(CONFIG_ZBUS_PRIORITY_BOOST)
static void isr_obs_attach_detach(const void *operation)
{
Expand Down