Skip to content

zbus: Add runtime observers pool #88834

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions doc/releases/migration-guide-4.2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,6 @@ SPI
Other subsystems
****************

ZBus
====

* The function :c:func:`zbus_chan_add_obs` now requires a :c:struct:`zbus_observer_node` as an argument,
which was previously allocated through :c:func:`k_malloc` internally. The structure must remain valid
in memory until :c:func:`zbus_chan_rem_obs` is called.

Modules
*******

Expand Down
10 changes: 10 additions & 0 deletions doc/releases/release-notes-4.2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ New APIs and options
* :c:func:`video_query_ctrl`
* :c:func:`video_print_ctrl`

* ZBus

* Runtime observers can work without heap. Now it is possible to choose between static and dynamic
allocation for the runtime observers nodes.

* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC`
* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC`
* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE`


New Boards
**********

Expand Down
32 changes: 20 additions & 12 deletions doc/services/zbus/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -848,28 +848,30 @@ The following code has the exact behavior of the code in :ref:`reading from a ch
Runtime observer registration
-----------------------------

It is possible to add observers to channels at runtime if
:kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` is enabled. In addition to the channel and observer
references, :c:func:`zbus_chan_add_obs` also requires a :c:struct:`zbus_observer_node` to link the two
together, which must remain valid in memory for the duration that the observer is attached to the
channel. The simplest way to achieve this is to make the structure ``static``.
It is possible to add observers to channels in runtime. Set the
:kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` to enable the feature. This feature uses the heap to
allocate the nodes dynamically or a memory slab to allocate the nodes statically. It depends on the
:kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC` which can be
:kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC` and
:kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC`. When the heap is enabled, the
dynamic one is automatically chosen. When
:kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC` is enabled, you need to set the
number of runtime observers you are going to use by setting the
:kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE` configuration. The following example
illustrates the runtime registration usage.


.. code-block:: c

ZBUS_LISTENER_DEFINE(my_listener, callback);
// ...
void thread_entry(void) {
static struct zbus_observer_node obs_node;
// ...
/* Adding the observer to channel chan1 */
zbus_chan_add_obs(&chan1, &my_listener, &obs_node, K_NO_WAIT);
zbus_chan_add_obs(&chan1, &my_listener, K_NO_WAIT);
/* Removing the observer from channel chan1 */
zbus_chan_rm_obs(&chan1, &my_listener, K_NO_WAIT);

.. warning::

The :c:struct:`zbus_observer_node` can only be re-used in :c:func:`zbus_chan_add_obs` after removing
the channel observer it was first associated with through :c:func:`zbus_chan_rm_obs`.

Samples
*******
Expand Down Expand Up @@ -936,7 +938,13 @@ Related configuration options:
a pool for the message subscriber for a set of channels;
* :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE` the biggest message of zbus
channels to be transported into a message buffer;
* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` enables the runtime observer registration.
* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` enables the runtime observer registration;
* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC` allocate the runtime observers
statically using a memory slab;
* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC` allocate the runtime observers
dynamically using the heap;
* :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE` the amount of enabled runtime
observers to statically allocate.

API Reference
*************
Expand Down
31 changes: 17 additions & 14 deletions include/zephyr/zbus/zbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ struct zbus_channel_observation_mask {
bool enabled;
};

/**
* @brief Structure for linking observers to chanels
*/
struct zbus_channel_observation {
const struct zbus_channel *chan;
const struct zbus_observer *obs;
Expand Down Expand Up @@ -861,32 +864,25 @@ static inline void zbus_chan_pub_stats_update(const struct zbus_channel *chan)

#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__)

/**
* @brief Structure for linking observers to chanels
*/
struct zbus_observer_node {
sys_snode_t node;
const struct zbus_observer *obs;
};

/**
* @brief Add an observer to a channel.
*
* This routine adds an observer to the channel.
*
* @param chan The channel's reference.
* @param obs The observer's reference to be added.
* @param node Persistent structure to link the channel to the observer
* @param timeout Waiting period to add an observer,
* or one of the special values K_NO_WAIT and K_FOREVER.
*
* @retval 0 Observer added to the channel.
* @retval -EALREADY The observer is already present in the channel's runtime observers list.
* @retval -EBUSY Returned without waiting.
* @retval -EAGAIN Waiting period timed out.
* @retval -EINVAL Some parameter is invalid.
* @retval -EEXIST The observer is already present in the channel's observers list.
* @retval -EALREADY The observer is already present in the channel's runtime observers list.
* @retval -ENOMEM No memory available for a new runtime observer node.
*/
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
struct zbus_observer_node *node, k_timeout_t timeout);
k_timeout_t timeout);

/**
* @brief Remove an observer from a channel.
Expand All @@ -899,15 +895,22 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
* or one of the special values K_NO_WAIT and K_FOREVER.
*
* @retval 0 Observer removed to the channel.
* @retval -EINVAL Invalid data supplied.
* @retval -EBUSY Returned without waiting.
* @retval -EAGAIN Waiting period timed out.
* @retval -ENODATA no observer found in channel's runtime observer list.
* @retval -ENOMEM Returned without waiting.
*/
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
k_timeout_t timeout);

/** @cond INTERNAL_HIDDEN */

struct zbus_observer_node {
sys_snode_t node;
const struct zbus_observer *obs;
};

/** @endcond */

#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */

/**
Expand Down
60 changes: 45 additions & 15 deletions samples/subsys/zbus/runtime_obs_registration/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,49 @@ tests:
type: multi_line
ordered: false
regex:
- "I: System started"
- "I: Activating filter"
- "I: Deactivating filter"
- "I: Bypass filter"
- "I: Disable bypass filter"
- "I: >-- Raw data fetched"
- "I: -|- Filtering data"
- "I: --> Consuming data: Acc x=0, y=0, z=0"
- "I: --> Consuming data: Acc x=2, y=2, z=2"
- "I: --> Consuming data: Acc x=4, y=4, z=4"
- "I: --> Consuming data: Acc x=6, y=6, z=6"
- "I: --> Consuming data: Acc x=7, y=7, z=7"
- "I: --> Consuming data: Acc x=8, y=8, z=8"
- "I: --> Consuming data: Acc x=9, y=9, z=9"
- "I: --> Consuming data: Acc x=10, y=10, z=10"
- ".*I: System started, runtime observers statically allocated"
- ".*I: Activating filter"
- ".*I: Deactivating filter"
- ".*I: Bypass filter"
- ".*I: Disable bypass filter"
- ".*I: >-- Raw data fetched"
- ".*I: -|- Filtering data"
- ".*I: --> Consuming data: Acc x=0, y=0, z=0"
- ".*I: --> Consuming data: Acc x=2, y=2, z=2"
- ".*I: --> Consuming data: Acc x=4, y=4, z=4"
- ".*I: --> Consuming data: Acc x=6, y=6, z=6"
- ".*I: --> Consuming data: Acc x=7, y=7, z=7"
- ".*I: --> Consuming data: Acc x=8, y=8, z=8"
- ".*I: --> Consuming data: Acc x=9, y=9, z=9"
- ".*I: --> Consuming data: Acc x=10, y=10, z=10"
tags: zbus
extra_configs:
- CONFIG_ZBUS_PREFER_DYNAMIC_ALLOCATION=n
sample.zbus.runtime_os_registration_dynamic_alloc:
min_ram: 16
integration_platforms:
- qemu_x86
arch_exclude: nios2
harness: console
harness_config:
type: multi_line
ordered: false
regex:
- ".*I: System started, runtime observers dynamically allocated"
- ".*I: Activating filter"
- ".*I: Deactivating filter"
- ".*I: Bypass filter"
- ".*I: Disable bypass filter"
- ".*I: >-- Raw data fetched"
- ".*I: -|- Filtering data"
- ".*I: --> Consuming data: Acc x=0, y=0, z=0"
- ".*I: --> Consuming data: Acc x=2, y=2, z=2"
- ".*I: --> Consuming data: Acc x=4, y=4, z=4"
- ".*I: --> Consuming data: Acc x=6, y=6, z=6"
- ".*I: --> Consuming data: Acc x=7, y=7, z=7"
- ".*I: --> Consuming data: Acc x=8, y=8, z=8"
- ".*I: --> Consuming data: Acc x=9, y=9, z=9"
- ".*I: --> Consuming data: Acc x=10, y=10, z=10"
tags: zbus
extra_configs:
- CONFIG_HEAP_MEM_POOL_SIZE=1024
9 changes: 5 additions & 4 deletions samples/subsys/zbus/runtime_obs_registration/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,23 @@ ZBUS_SUBSCRIBER_DEFINE(state_change_sub, 5);

int main(void)
{
LOG_INF("System started");
LOG_INF("System started, runtime observers %s allocated",
IS_ENABLED(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC) ? "dynamically"
: "statically");

const struct zbus_channel *chan;
struct zbus_observer_node obs_node;

while (1) {
LOG_INF("Activating filter");
zbus_chan_add_obs(&raw_data_chan, &filter_lis, &obs_node, K_MSEC(200));
zbus_chan_add_obs(&raw_data_chan, &filter_lis, K_MSEC(200));

zbus_sub_wait(&state_change_sub, &chan, K_FOREVER);

LOG_INF("Deactivating filter");
zbus_chan_rm_obs(&raw_data_chan, &filter_lis, K_MSEC(200));

LOG_INF("Bypass filter");
zbus_chan_add_obs(&raw_data_chan, &consumer_sub, &obs_node, K_MSEC(200));
zbus_chan_add_obs(&raw_data_chan, &consumer_sub, K_MSEC(200));

zbus_sub_wait(&state_change_sub, &chan, K_FOREVER);

Expand Down
35 changes: 31 additions & 4 deletions subsys/zbus/Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ menuconfig ZBUS

if ZBUS

config ZBUS_PREFER_DYNAMIC_ALLOCATION
bool "Set zbus to work with dynamic allocation using the system heap"
default y

config ZBUS_CHANNELS_SYS_INIT_PRIORITY
default 5
int "The priority used during the SYS_INIT procedure."
Expand All @@ -33,7 +37,8 @@ if ZBUS_MSG_SUBSCRIBER

choice ZBUS_MSG_SUBSCRIBER_BUF_ALLOC
prompt "ZBus msg_subscribers buffer allocation"
default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC
default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC if ZBUS_PREFER_DYNAMIC_ALLOCATION
default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC

config ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC
bool "Use heap to allocate msg_subscriber buffers data"
Expand Down Expand Up @@ -63,6 +68,28 @@ endif # ZBUS_MSG_SUBSCRIBER
config ZBUS_RUNTIME_OBSERVERS
bool "Runtime observers support."

if ZBUS_RUNTIME_OBSERVERS

choice ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC
prompt "ZBus runtime observers node allocation"
default ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC if ZBUS_PREFER_DYNAMIC_ALLOCATION
default ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC

config ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC
bool "Use heap to allocate runtime observers node"

config ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC
bool "Use a pool of runtime observers nodes"

endchoice

config ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE
int "Runtime observer pool size"
depends on ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC
default 8

endif # ZBUS_RUNTIME_OBSERVERS

config ZBUS_PRIORITY_BOOST
bool "ZBus priority boost algorithm"
default y
Expand All @@ -80,9 +107,9 @@ config ZBUS_ASSERT_MOCK

config HEAP_MEM_POOL_ADD_SIZE_ZBUS
int
default 2048 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && !ZBUS_RUNTIME_OBSERVERS
default 1024 if !ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS
default 3072 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS
default 2048 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && !ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC
default 1024 if !ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC
default 3072 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC


module = ZBUS
Expand Down
Loading