diff --git a/doc/services/portability/posix/option_groups/index.rst b/doc/services/portability/posix/option_groups/index.rst index 2a1eefdc0a1b..40871f998209 100644 --- a/doc/services/portability/posix/option_groups/index.rst +++ b/doc/services/portability/posix/option_groups/index.rst @@ -359,7 +359,7 @@ _POSIX_MESSAGE_PASSING mq_close(),yes mq_getattr(),yes - mq_notify(), + mq_notify(),yes mq_open(),yes mq_receive(),yes mq_send(),yes diff --git a/include/zephyr/posix/mqueue.h b/include/zephyr/posix/mqueue.h index 29f67607e2c6..dbaa6d42a537 100644 --- a/include/zephyr/posix/mqueue.h +++ b/include/zephyr/posix/mqueue.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include "posix_types.h" @@ -40,6 +41,7 @@ int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abstime); int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abstime); +int mq_notify(mqd_t mqdes, const struct sigevent *notification); #ifdef __cplusplus } diff --git a/lib/posix/mqueue.c b/lib/posix/mqueue.c index 47e660111bb1..042b40910a8a 100644 --- a/lib/posix/mqueue.c +++ b/lib/posix/mqueue.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2018 Intel Corporation + * Copyright (c) 2024 BayLibre, SAS * * SPDX-License-Identifier: Apache-2.0 */ @@ -7,8 +8,10 @@ #include #include #include -#include #include +#include + +#define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD) typedef struct mqueue_object { sys_snode_t snode; @@ -17,6 +20,7 @@ typedef struct mqueue_object { struct k_msgq queue; atomic_t ref_count; char *name; + struct sigevent not; } mqueue_object; typedef struct mqueue_desc { @@ -34,9 +38,11 @@ int64_t timespec_to_timeoutms(const struct timespec *abstime); static mqueue_object *find_in_list(const char *name); static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len, k_timeout_t timeout); -static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len, +static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len, k_timeout_t timeout); +static void remove_notification(mqueue_object *msg_queue); static void remove_mq(mqueue_object *msg_queue); +static void *mq_notify_thread(void *arg); /** * @brief Open a message queue. @@ -341,6 +347,74 @@ int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, return 0; } +/** + * @brief Notify process that a message is available. + * + * See IEEE 1003.1 + */ +int mq_notify(mqd_t mqdes, const struct sigevent *notification) +{ + mqueue_desc *mqd = (mqueue_desc *)mqdes; + + if (mqd == NULL) { + errno = EBADF; + return -1; + } + + mqueue_object *msg_queue = mqd->mqueue; + + if (notification == NULL) { + if ((msg_queue->not.sigev_notify & SIGEV_MASK) == 0) { + errno = EINVAL; + return -1; + } + remove_notification(msg_queue); + return 0; + } + + if ((msg_queue->not.sigev_notify & SIGEV_MASK) != 0) { + errno = EBUSY; + return -1; + } + if (notification->sigev_notify == SIGEV_SIGNAL) { + errno = ENOSYS; + return -1; + } + if (notification->sigev_notify_attributes != NULL) { + int ret = pthread_attr_setdetachstate(notification->sigev_notify_attributes, + PTHREAD_CREATE_DETACHED); + if (ret != 0) { + errno = ret; + return -1; + } + } + + k_sem_take(&mq_sem, K_FOREVER); + memcpy(&msg_queue->not, notification, sizeof(struct sigevent)); + k_sem_give(&mq_sem); + + return 0; +} + +static void *mq_notify_thread(void *arg) +{ + mqueue_object *mqueue = (mqueue_object *)arg; + struct sigevent *sevp = &mqueue->not; + + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + + if (sevp->sigev_notify_attributes == NULL) { + pthread_detach(pthread_self()); + } + + sevp->sigev_notify_function(sevp->sigev_value); + + remove_notification(mqueue); + + pthread_exit(NULL); + return NULL; +} + /* Internal functions */ static mqueue_object *find_in_list(const char *name) { @@ -380,11 +454,28 @@ static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_le return ret; } + uint32_t msgq_num = k_msgq_num_used_get(&mqd->mqueue->queue); + if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) { errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT; return ret; } + if (k_msgq_num_used_get(&mqd->mqueue->queue) - msgq_num > 0) { + struct sigevent *sevp = &mqd->mqueue->not; + + if (sevp->sigev_notify == SIGEV_NONE) { + sevp->sigev_notify_function(sevp->sigev_value); + } else if (sevp->sigev_notify == SIGEV_THREAD) { + pthread_t th; + + ret = pthread_create(&th, + sevp->sigev_notify_attributes, + mq_notify_thread, + mqd->mqueue); + } + } + return 0; } @@ -428,3 +519,10 @@ static void remove_mq(mqueue_object *msg_queue) k_free(msg_queue->mem_obj); } } + +static void remove_notification(mqueue_object *msg_queue) +{ + k_sem_take(&mq_sem, K_FOREVER); + memset(&msg_queue->not, 0, sizeof(struct sigevent)); + k_sem_give(&mq_sem); +} diff --git a/tests/posix/common/src/mqueue.c b/tests/posix/common/src/mqueue.c index 1b0a82d66f57..45229ba9d13b 100644 --- a/tests/posix/common/src/mqueue.c +++ b/tests/posix/common/src/mqueue.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2018 Intel Corporation + * Copyright (c) 2024 BayLibre, SAS * * SPDX-License-Identifier: Apache-2.0 */ @@ -12,8 +13,6 @@ #include #define N_THR 2 -#define SENDER_THREAD 0 -#define RECEIVER_THREAD 1 #define MESSAGE_SIZE 16 #define MESG_COUNT_PERMQ 4 @@ -97,4 +96,184 @@ ZTEST(mqueue, test_mqueue) zassert_false(mq_unlink(queue), "Not able to unlink Queue"); } +static bool notification_executed; + +void notify_function_basic(union sigval val) +{ + mqd_t mqd; + bool *executed = (bool *)val.sival_ptr; + + mqd = mq_open(queue, O_RDONLY); + + mq_receive(mqd, rec_data, MESSAGE_SIZE, 0); + zassert_ok(strcmp(rec_data, send_data), + "Error in data reception. exp: %s act: %s", send_data, rec_data); + + zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); + + *executed = true; +} + +ZTEST(mqueue, test_mqueue_notify_basic) +{ + mqd_t mqd; + struct mq_attr attrs = { + .mq_msgsize = MESSAGE_SIZE, + .mq_maxmsg = MESG_COUNT_PERMQ, + }; + struct sigevent not = { + .sigev_notify = SIGEV_NONE, + .sigev_value.sival_ptr = (void *)¬ification_executed, + .sigev_notify_function = notify_function_basic, + }; + int32_t mode = 0777; + int flags = O_RDWR | O_CREAT; + + notification_executed = false; + memset(rec_data, 0, MESSAGE_SIZE); + + mqd = mq_open(queue, flags, mode, &attrs); + + zassert_ok(mq_notify(mqd, ¬), "Unable to set notification."); + + zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message"); + + zassert_true(notification_executed, "Notification not triggered."); + + zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); + zassert_ok(mq_unlink(queue), "Unable to unlink queue"); +} + +void notify_function_thread(union sigval val) +{ + mqd_t mqd; + pthread_t sender = (pthread_t)val.sival_int; + + zassert_not_equal(sender, pthread_self(), + "Notification function should be executed from different thread."); + + mqd = mq_open(queue, O_RDONLY); + + mq_receive(mqd, rec_data, MESSAGE_SIZE, 0); + zassert_ok(strcmp(rec_data, send_data), + "Error in data reception. exp: %s act: %s", send_data, rec_data); + + zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); + + notification_executed = true; +} + +ZTEST(mqueue, test_mqueue_notify_thread) +{ + mqd_t mqd; + struct mq_attr attrs = { + .mq_msgsize = MESSAGE_SIZE, + .mq_maxmsg = MESG_COUNT_PERMQ, + }; + struct sigevent not = { + .sigev_notify = SIGEV_THREAD, + .sigev_value.sival_int = (int)pthread_self(), + .sigev_notify_function = notify_function_thread, + }; + int32_t mode = 0777; + int flags = O_RDWR | O_CREAT; + + notification_executed = false; + memset(rec_data, 0, MESSAGE_SIZE); + + mqd = mq_open(queue, flags, mode, &attrs); + + zassert_ok(mq_notify(mqd, ¬), "Unable to set notification."); + + zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message"); + + usleep(USEC_PER_MSEC * 10U); + + zassert_true(notification_executed, "Notification not triggered."); + + zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); + zassert_ok(mq_unlink(queue), "Unable to unlink queue"); +} + +ZTEST(mqueue, test_mqueue_notify_non_empty_queue) +{ + mqd_t mqd; + struct mq_attr attrs = { + .mq_msgsize = MESSAGE_SIZE, + .mq_maxmsg = MESG_COUNT_PERMQ, + }; + struct sigevent not = { + .sigev_notify = SIGEV_NONE, + .sigev_value.sival_ptr = (void *)¬ification_executed, + .sigev_notify_function = notify_function_basic, + }; + int32_t mode = 0777; + int flags = O_RDWR | O_CREAT; + + notification_executed = false; + memset(rec_data, 0, MESSAGE_SIZE); + + mqd = mq_open(queue, flags, mode, &attrs); + + zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message"); + + zassert_ok(mq_notify(mqd, ¬), "Unable to set notification."); + + zassert_false(notification_executed, "Notification shouldn't be processed."); + + mq_receive(mqd, rec_data, MESSAGE_SIZE, 0); + zassert_false(strcmp(rec_data, send_data), + "Error in data reception. exp: %s act: %s", send_data, rec_data); + + memset(rec_data, 0, MESSAGE_SIZE); + + zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message"); + + zassert_true(notification_executed, "Notification not triggered."); + + zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); + zassert_ok(mq_unlink(queue), "Unable to unlink queue"); +} + +ZTEST(mqueue, test_mqueue_notify_errors) +{ + mqd_t mqd; + struct mq_attr attrs = { + .mq_msgsize = MESSAGE_SIZE, + .mq_maxmsg = MESG_COUNT_PERMQ, + }; + struct sigevent not = { + .sigev_notify = SIGEV_SIGNAL, + .sigev_value.sival_ptr = (void *)¬ification_executed, + .sigev_notify_function = notify_function_basic, + }; + int32_t mode = 0777; + int flags = O_RDWR | O_CREAT; + + zassert_not_ok(mq_notify(NULL, NULL), "Should return -1 and set errno to EBADF."); + zassert_equal(errno, EBADF); + + mqd = mq_open(queue, flags, mode, &attrs); + + zassert_not_ok(mq_notify(mqd, NULL), "Should return -1 and set errno to EINVAL."); + zassert_equal(errno, EINVAL); + + zassert_not_ok(mq_notify(mqd, ¬), "SIGEV_SIGNAL not supported should return -1."); + zassert_equal(errno, ENOSYS); + + not.sigev_notify = SIGEV_NONE; + + zassert_ok(mq_notify(mqd, ¬), + "Unexpected error while asigning notification to the queue."); + + zassert_not_ok(mq_notify(mqd, ¬), + "Can't assign notification when there is another assigned."); + zassert_equal(errno, EBUSY); + + zassert_ok(mq_notify(mqd, NULL), "Unable to remove notification from the message queue."); + + zassert_ok(mq_close(mqd), "Unable to close message queue descriptor."); + zassert_ok(mq_unlink(queue), "Unable to unlink queue"); +} + ZTEST_SUITE(mqueue, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/posix/headers/src/mqueue_h.c b/tests/posix/headers/src/mqueue_h.c index 4a1209fe539f..68fa4b2c9c08 100644 --- a/tests/posix/headers/src/mqueue_h.c +++ b/tests/posix/headers/src/mqueue_h.c @@ -29,7 +29,7 @@ ZTEST(posix_headers, test_mqueue_h) if (IS_ENABLED(CONFIG_POSIX_API)) { zassert_not_null(mq_close); zassert_not_null(mq_getattr); - /* zassert_not_null(mq_notify); */ /* not implemented */ + zassert_not_null(mq_notify); zassert_not_null(mq_open); zassert_not_null(mq_receive); zassert_not_null(mq_send);