Skip to content

posix: Implement mq_notify() #67988

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/services/portability/posix/option_groups/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ _POSIX_MESSAGE_PASSING

mq_close(),yes
mq_getattr(),yes
mq_notify(),
mq_notify(),yes
mq_open(),yes
mq_receive(),yes
mq_send(),yes
Expand Down
2 changes: 2 additions & 0 deletions include/zephyr/posix/mqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <zephyr/kernel.h>
#include <zephyr/posix/time.h>
#include <zephyr/posix/fcntl.h>
#include <zephyr/posix/signal.h>
#include <zephyr/posix/sys/stat.h>
#include "posix_types.h"

Expand Down Expand Up @@ -40,6 +41,7 @@ int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio, const struct timespec *abstime);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio, const struct timespec *abstime);
int mq_notify(mqd_t mqdes, const struct sigevent *notification);

#ifdef __cplusplus
}
Expand Down
102 changes: 100 additions & 2 deletions lib/posix/mqueue.c
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
/*
* Copyright (c) 2018 Intel Corporation
* Copyright (c) 2024 BayLibre, SAS
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/kernel.h>
#include <errno.h>
#include <string.h>
#include <zephyr/sys/atomic.h>
#include <zephyr/posix/time.h>
#include <zephyr/posix/mqueue.h>
#include <zephyr/posix/pthread.h>

#define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD)

typedef struct mqueue_object {
sys_snode_t snode;
Expand All @@ -17,6 +20,7 @@ typedef struct mqueue_object {
struct k_msgq queue;
atomic_t ref_count;
char *name;
struct sigevent not;
} mqueue_object;

typedef struct mqueue_desc {
Expand All @@ -34,9 +38,11 @@ int64_t timespec_to_timeoutms(const struct timespec *abstime);
static mqueue_object *find_in_list(const char *name);
static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
k_timeout_t timeout);
static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
k_timeout_t timeout);
static void remove_notification(mqueue_object *msg_queue);
static void remove_mq(mqueue_object *msg_queue);
static void *mq_notify_thread(void *arg);

/**
* @brief Open a message queue.
Expand Down Expand Up @@ -341,6 +347,74 @@ int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
return 0;
}

/**
* @brief Notify process that a message is available.
*
* See IEEE 1003.1
*/
int mq_notify(mqd_t mqdes, const struct sigevent *notification)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;

if (mqd == NULL) {
errno = EBADF;
return -1;
}

mqueue_object *msg_queue = mqd->mqueue;

if (notification == NULL) {
if ((msg_queue->not.sigev_notify & SIGEV_MASK) == 0) {
errno = EINVAL;
return -1;
}
remove_notification(msg_queue);
return 0;
}

if ((msg_queue->not.sigev_notify & SIGEV_MASK) != 0) {
errno = EBUSY;
return -1;
}
if (notification->sigev_notify == SIGEV_SIGNAL) {
errno = ENOSYS;
return -1;
}
if (notification->sigev_notify_attributes != NULL) {
int ret = pthread_attr_setdetachstate(notification->sigev_notify_attributes,
PTHREAD_CREATE_DETACHED);
if (ret != 0) {
errno = ret;
return -1;
}
}

k_sem_take(&mq_sem, K_FOREVER);
memcpy(&msg_queue->not, notification, sizeof(struct sigevent));
k_sem_give(&mq_sem);

return 0;
}

static void *mq_notify_thread(void *arg)
{
mqueue_object *mqueue = (mqueue_object *)arg;
struct sigevent *sevp = &mqueue->not;

pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

if (sevp->sigev_notify_attributes == NULL) {
pthread_detach(pthread_self());
}

sevp->sigev_notify_function(sevp->sigev_value);

remove_notification(mqueue);

pthread_exit(NULL);
return NULL;
}

/* Internal functions */
static mqueue_object *find_in_list(const char *name)
{
Expand Down Expand Up @@ -380,11 +454,28 @@ static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_le
return ret;
}

uint32_t msgq_num = k_msgq_num_used_get(&mqd->mqueue->queue);

if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
return ret;
}

if (k_msgq_num_used_get(&mqd->mqueue->queue) - msgq_num > 0) {
struct sigevent *sevp = &mqd->mqueue->not;

if (sevp->sigev_notify == SIGEV_NONE) {
sevp->sigev_notify_function(sevp->sigev_value);
} else if (sevp->sigev_notify == SIGEV_THREAD) {
pthread_t th;

ret = pthread_create(&th,
sevp->sigev_notify_attributes,
mq_notify_thread,
mqd->mqueue);
}
}

return 0;
}

Expand Down Expand Up @@ -428,3 +519,10 @@ static void remove_mq(mqueue_object *msg_queue)
k_free(msg_queue->mem_obj);
}
}

static void remove_notification(mqueue_object *msg_queue)
{
k_sem_take(&mq_sem, K_FOREVER);
memset(&msg_queue->not, 0, sizeof(struct sigevent));
k_sem_give(&mq_sem);
}
183 changes: 181 additions & 2 deletions tests/posix/common/src/mqueue.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2018 Intel Corporation
* Copyright (c) 2024 BayLibre, SAS
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand All @@ -12,8 +13,6 @@
#include <zephyr/ztest.h>

#define N_THR 2
#define SENDER_THREAD 0
#define RECEIVER_THREAD 1
#define MESSAGE_SIZE 16
#define MESG_COUNT_PERMQ 4

Expand Down Expand Up @@ -97,4 +96,184 @@ ZTEST(mqueue, test_mqueue)
zassert_false(mq_unlink(queue), "Not able to unlink Queue");
}

static bool notification_executed;

void notify_function_basic(union sigval val)
{
mqd_t mqd;
bool *executed = (bool *)val.sival_ptr;

mqd = mq_open(queue, O_RDONLY);

mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
zassert_ok(strcmp(rec_data, send_data),
"Error in data reception. exp: %s act: %s", send_data, rec_data);

zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");

*executed = true;
}

ZTEST(mqueue, test_mqueue_notify_basic)
{
mqd_t mqd;
struct mq_attr attrs = {
.mq_msgsize = MESSAGE_SIZE,
.mq_maxmsg = MESG_COUNT_PERMQ,
};
struct sigevent not = {
.sigev_notify = SIGEV_NONE,
.sigev_value.sival_ptr = (void *)&notification_executed,
.sigev_notify_function = notify_function_basic,
};
int32_t mode = 0777;
int flags = O_RDWR | O_CREAT;

notification_executed = false;
memset(rec_data, 0, MESSAGE_SIZE);

mqd = mq_open(queue, flags, mode, &attrs);

zassert_ok(mq_notify(mqd, &not), "Unable to set notification.");

zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");

zassert_true(notification_executed, "Notification not triggered.");

zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
zassert_ok(mq_unlink(queue), "Unable to unlink queue");
}

void notify_function_thread(union sigval val)
{
mqd_t mqd;
pthread_t sender = (pthread_t)val.sival_int;

zassert_not_equal(sender, pthread_self(),
"Notification function should be executed from different thread.");

mqd = mq_open(queue, O_RDONLY);

mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
zassert_ok(strcmp(rec_data, send_data),
"Error in data reception. exp: %s act: %s", send_data, rec_data);

zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");

notification_executed = true;
}

ZTEST(mqueue, test_mqueue_notify_thread)
{
mqd_t mqd;
struct mq_attr attrs = {
.mq_msgsize = MESSAGE_SIZE,
.mq_maxmsg = MESG_COUNT_PERMQ,
};
struct sigevent not = {
.sigev_notify = SIGEV_THREAD,
.sigev_value.sival_int = (int)pthread_self(),
.sigev_notify_function = notify_function_thread,
};
int32_t mode = 0777;
int flags = O_RDWR | O_CREAT;

notification_executed = false;
memset(rec_data, 0, MESSAGE_SIZE);

mqd = mq_open(queue, flags, mode, &attrs);

zassert_ok(mq_notify(mqd, &not), "Unable to set notification.");

zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");

usleep(USEC_PER_MSEC * 10U);

zassert_true(notification_executed, "Notification not triggered.");

zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
zassert_ok(mq_unlink(queue), "Unable to unlink queue");
}

ZTEST(mqueue, test_mqueue_notify_non_empty_queue)
{
mqd_t mqd;
struct mq_attr attrs = {
.mq_msgsize = MESSAGE_SIZE,
.mq_maxmsg = MESG_COUNT_PERMQ,
};
struct sigevent not = {
.sigev_notify = SIGEV_NONE,
.sigev_value.sival_ptr = (void *)&notification_executed,
.sigev_notify_function = notify_function_basic,
};
int32_t mode = 0777;
int flags = O_RDWR | O_CREAT;

notification_executed = false;
memset(rec_data, 0, MESSAGE_SIZE);

mqd = mq_open(queue, flags, mode, &attrs);

zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");

zassert_ok(mq_notify(mqd, &not), "Unable to set notification.");

zassert_false(notification_executed, "Notification shouldn't be processed.");

mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
zassert_false(strcmp(rec_data, send_data),
"Error in data reception. exp: %s act: %s", send_data, rec_data);

memset(rec_data, 0, MESSAGE_SIZE);

zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");

zassert_true(notification_executed, "Notification not triggered.");

zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
zassert_ok(mq_unlink(queue), "Unable to unlink queue");
}

ZTEST(mqueue, test_mqueue_notify_errors)
{
mqd_t mqd;
struct mq_attr attrs = {
.mq_msgsize = MESSAGE_SIZE,
.mq_maxmsg = MESG_COUNT_PERMQ,
};
struct sigevent not = {
.sigev_notify = SIGEV_SIGNAL,
.sigev_value.sival_ptr = (void *)&notification_executed,
.sigev_notify_function = notify_function_basic,
};
int32_t mode = 0777;
int flags = O_RDWR | O_CREAT;

zassert_not_ok(mq_notify(NULL, NULL), "Should return -1 and set errno to EBADF.");
zassert_equal(errno, EBADF);

mqd = mq_open(queue, flags, mode, &attrs);

zassert_not_ok(mq_notify(mqd, NULL), "Should return -1 and set errno to EINVAL.");
zassert_equal(errno, EINVAL);

zassert_not_ok(mq_notify(mqd, &not), "SIGEV_SIGNAL not supported should return -1.");
zassert_equal(errno, ENOSYS);

not.sigev_notify = SIGEV_NONE;

zassert_ok(mq_notify(mqd, &not),
"Unexpected error while asigning notification to the queue.");

zassert_not_ok(mq_notify(mqd, &not),
"Can't assign notification when there is another assigned.");
zassert_equal(errno, EBUSY);

zassert_ok(mq_notify(mqd, NULL), "Unable to remove notification from the message queue.");

zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
zassert_ok(mq_unlink(queue), "Unable to unlink queue");
}

ZTEST_SUITE(mqueue, NULL, NULL, NULL, NULL, NULL);
2 changes: 1 addition & 1 deletion tests/posix/headers/src/mqueue_h.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ ZTEST(posix_headers, test_mqueue_h)
if (IS_ENABLED(CONFIG_POSIX_API)) {
zassert_not_null(mq_close);
zassert_not_null(mq_getattr);
/* zassert_not_null(mq_notify); */ /* not implemented */
zassert_not_null(mq_notify);
zassert_not_null(mq_open);
zassert_not_null(mq_receive);
zassert_not_null(mq_send);
Expand Down