1
1
/*
2
2
* Copyright (c) 2018 Intel Corporation
3
+ * Copyright (c) 2024 BayLibre, SAS
3
4
*
4
5
* SPDX-License-Identifier: Apache-2.0
5
6
*/
6
7
#include <zephyr/kernel.h>
7
8
#include <errno.h>
8
9
#include <string.h>
9
10
#include <zephyr/sys/atomic.h>
10
- #include <zephyr/posix/time.h>
11
11
#include <zephyr/posix/mqueue.h>
12
+ #include <zephyr/posix/pthread.h>
13
+
14
+ #define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD)
12
15
13
16
typedef struct mqueue_object {
14
17
sys_snode_t snode ;
@@ -17,6 +20,7 @@ typedef struct mqueue_object {
17
20
struct k_msgq queue ;
18
21
atomic_t ref_count ;
19
22
char * name ;
23
+ struct sigevent not ;
20
24
} mqueue_object ;
21
25
22
26
typedef struct mqueue_desc {
@@ -34,9 +38,11 @@ int64_t timespec_to_timeoutms(const struct timespec *abstime);
34
38
static mqueue_object * find_in_list (const char * name );
35
39
static int32_t send_message (mqueue_desc * mqd , const char * msg_ptr , size_t msg_len ,
36
40
k_timeout_t timeout );
37
- static int receive_message (mqueue_desc * mqd , char * msg_ptr , size_t msg_len ,
41
+ static int32_t receive_message (mqueue_desc * mqd , char * msg_ptr , size_t msg_len ,
38
42
k_timeout_t timeout );
43
+ static void remove_notification (mqueue_object * msg_queue );
39
44
static void remove_mq (mqueue_object * msg_queue );
45
+ static void * mq_notify_thread (void * arg );
40
46
41
47
/**
42
48
* @brief Open a message queue.
@@ -341,6 +347,74 @@ int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
341
347
return 0 ;
342
348
}
343
349
350
+ /**
351
+ * @brief Notify process that a message is available.
352
+ *
353
+ * See IEEE 1003.1
354
+ */
355
+ int mq_notify (mqd_t mqdes , const struct sigevent * notification )
356
+ {
357
+ mqueue_desc * mqd = (mqueue_desc * )mqdes ;
358
+
359
+ if (mqd == NULL ) {
360
+ errno = EBADF ;
361
+ return -1 ;
362
+ }
363
+
364
+ mqueue_object * msg_queue = mqd -> mqueue ;
365
+
366
+ if (notification == NULL ) {
367
+ if ((msg_queue -> not .sigev_notify & SIGEV_MASK ) == 0 ) {
368
+ errno = EINVAL ;
369
+ return -1 ;
370
+ }
371
+ remove_notification (msg_queue );
372
+ return 0 ;
373
+ }
374
+
375
+ if ((msg_queue -> not .sigev_notify & SIGEV_MASK ) != 0 ) {
376
+ errno = EBUSY ;
377
+ return -1 ;
378
+ }
379
+ if (notification -> sigev_notify == SIGEV_SIGNAL ) {
380
+ errno = ENOSYS ;
381
+ return -1 ;
382
+ }
383
+ if (notification -> sigev_notify_attributes != NULL ) {
384
+ int ret = pthread_attr_setdetachstate (notification -> sigev_notify_attributes ,
385
+ PTHREAD_CREATE_DETACHED );
386
+ if (ret != 0 ) {
387
+ errno = ret ;
388
+ return -1 ;
389
+ }
390
+ }
391
+
392
+ k_sem_take (& mq_sem , K_FOREVER );
393
+ memcpy (& msg_queue -> not , notification , sizeof (struct sigevent ));
394
+ k_sem_give (& mq_sem );
395
+
396
+ return 0 ;
397
+ }
398
+
399
+ static void * mq_notify_thread (void * arg )
400
+ {
401
+ mqueue_object * mqueue = (mqueue_object * )arg ;
402
+ struct sigevent * sevp = & mqueue -> not ;
403
+
404
+ pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS , NULL );
405
+
406
+ if (sevp -> sigev_notify_attributes == NULL ) {
407
+ pthread_detach (pthread_self ());
408
+ }
409
+
410
+ sevp -> sigev_notify_function (sevp -> sigev_value );
411
+
412
+ remove_notification (mqueue );
413
+
414
+ pthread_exit (NULL );
415
+ return NULL ;
416
+ }
417
+
344
418
/* Internal functions */
345
419
static mqueue_object * find_in_list (const char * name )
346
420
{
@@ -380,11 +454,28 @@ static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_le
380
454
return ret ;
381
455
}
382
456
457
+ uint32_t msgq_num = k_msgq_num_used_get (& mqd -> mqueue -> queue );
458
+
383
459
if (k_msgq_put (& mqd -> mqueue -> queue , (void * )msg_ptr , timeout ) != 0 ) {
384
460
errno = K_TIMEOUT_EQ (timeout , K_NO_WAIT ) ? EAGAIN : ETIMEDOUT ;
385
461
return ret ;
386
462
}
387
463
464
+ if (k_msgq_num_used_get (& mqd -> mqueue -> queue ) - msgq_num > 0 ) {
465
+ struct sigevent * sevp = & mqd -> mqueue -> not ;
466
+
467
+ if (sevp -> sigev_notify == SIGEV_NONE ) {
468
+ sevp -> sigev_notify_function (sevp -> sigev_value );
469
+ } else if (sevp -> sigev_notify == SIGEV_THREAD ) {
470
+ pthread_t th ;
471
+
472
+ ret = pthread_create (& th ,
473
+ sevp -> sigev_notify_attributes ,
474
+ mq_notify_thread ,
475
+ mqd -> mqueue );
476
+ }
477
+ }
478
+
388
479
return 0 ;
389
480
}
390
481
@@ -428,3 +519,10 @@ static void remove_mq(mqueue_object *msg_queue)
428
519
k_free (msg_queue -> mem_obj );
429
520
}
430
521
}
522
+
523
+ static void remove_notification (mqueue_object * msg_queue )
524
+ {
525
+ k_sem_take (& mq_sem , K_FOREVER );
526
+ memset (& msg_queue -> not , 0 , sizeof (struct sigevent ));
527
+ k_sem_give (& mq_sem );
528
+ }
0 commit comments