diff --git a/include/zephyr/posix/unistd.h b/include/zephyr/posix/unistd.h index 96ea52477c91..dcecd7366c32 100644 --- a/include/zephyr/posix/unistd.h +++ b/include/zephyr/posix/unistd.h @@ -48,6 +48,10 @@ int rmdir(const char *path); FUNC_NORETURN void _exit(int status); +#ifdef CONFIG_POSIX_PIPE +int pipe(int fildes[2]); +#endif + #ifdef CONFIG_NETWORKING static inline int gethostname(char *buf, size_t len) { diff --git a/lib/posix/options/CMakeLists.txt b/lib/posix/options/CMakeLists.txt index 4be0e8b058c9..8702dd9fdb4c 100644 --- a/lib/posix/options/CMakeLists.txt +++ b/lib/posix/options/CMakeLists.txt @@ -126,6 +126,7 @@ if (NOT CONFIG_TC_PROVIDES_POSIX_TIMERS) endif() zephyr_library_sources_ifdef(CONFIG_POSIX_PRIORITY_SCHEDULING sched.c) +zephyr_library_sources_ifdef(CONFIG_POSIX_PIPE pipe.c) if (NOT CONFIG_TC_PROVIDES_POSIX_READER_WRITER_LOCKS) # Note: the Option is _POSIX_READER_WRITER_LOCKS, while the Option Group is POSIX_RW_LOCKS. diff --git a/lib/posix/options/Kconfig b/lib/posix/options/Kconfig index f453f5295d02..7105d2223920 100644 --- a/lib/posix/options/Kconfig +++ b/lib/posix/options/Kconfig @@ -18,6 +18,7 @@ rsource "Kconfig.fs" rsource "Kconfig.mem" rsource "Kconfig.mqueue" rsource "Kconfig.net" +rsource "Kconfig.pipe" rsource "Kconfig.proc1" rsource "Kconfig.procN" rsource "Kconfig.pthread" diff --git a/lib/posix/options/Kconfig.pipe b/lib/posix/options/Kconfig.pipe new file mode 100644 index 000000000000..d08424743ccd --- /dev/null +++ b/lib/posix/options/Kconfig.pipe @@ -0,0 +1,32 @@ +# Copyright (c) 2024, Bruce Rosier +# +# SPDX-License-Identifier: Apache-2.0 + +menu "POSIX Pipe Options" + +config POSIX_PIPE + bool"POSIX pipe support" + default y + select PIPES + help + For more information, please see + https://pubs.opengroup.org/onlinepubs/9699919799/xrat/V4_subprofiles.html + +if POSIX_PIPE + +config POSIX_PIPES_MAX + int "Number of available pipes" + default 2 + help + Specify the number of POSIX pipes available in the system. + This value controls how many pipes can be created concurrently. + +config POSIX_PIPE_BUF + int "Size of pipe buffer" + default 512 + help + Specify the size of the buffe (in bytes) used by the pipe function. + +endif + +endmenu diff --git a/lib/posix/options/Kconfig.profile b/lib/posix/options/Kconfig.profile index b57e07fbc282..01d5cc19e7a4 100644 --- a/lib/posix/options/Kconfig.profile +++ b/lib/posix/options/Kconfig.profile @@ -169,7 +169,7 @@ config POSIX_AEP_REALTIME_DEDICATED # Option Groups select POSIX_MULTI_PROCESS select POSIX_NETWORKING - # select POSIX_PIPE + select POSIX_PIPE # select POSIX_SIGNAL_JUMP # Options select POSIX_CPUTIME diff --git a/lib/posix/options/pipe.c b/lib/posix/options/pipe.c new file mode 100644 index 000000000000..b4375a50e528 --- /dev/null +++ b/lib/posix/options/pipe.c @@ -0,0 +1,266 @@ +/* + * Copyright (c) Bruce ROSIER + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include + +static ssize_t pipe_read_vmeth(void *obj, void *buffer, size_t count); +static ssize_t pipe_write_vmeth(void *obj, const void *buffer, size_t count); +static int pipe_close_vmeth(void *obj); +static int pipe_ioctl_vmeth(void *obj, unsigned int request, va_list args); + +struct pipe_desc { + unsigned char __aligned(4) ring_buffer[CONFIG_POSIX_PIPE_BUF]; + struct k_pipe pipe; + int flags_read; + int flags_write; + atomic_t read_opened; + atomic_t write_opened; + atomic_t is_used; + struct k_sem sync; +}; + +static struct pipe_desc pipe_desc_array[CONFIG_POSIX_PIPES_MAX]; + +static struct fd_op_vtable fs_fd_op_vtable = { + .read = pipe_read_vmeth, + .write = pipe_write_vmeth, + .close = pipe_close_vmeth, + .ioctl = pipe_ioctl_vmeth, +}; + +static ssize_t pipe_read_vmeth(void *obj, void *buffer, size_t count) +{ + ssize_t total_read = 0; + int *ptr = (int *)obj; + unsigned char *read_buf = (unsigned char *)buffer; + int rc; + + struct pipe_desc *p_pipe_desc = + (struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read)); + + if (((*ptr & FS_O_READ) == 0) || atomic_get(&p_pipe_desc->read_opened) == 0) { + errno = EACCES; + return -1; + } + + if ((*ptr & O_NONBLOCK) == O_NONBLOCK) { + rc = k_pipe_get(&(p_pipe_desc->pipe), read_buf, count, &total_read, 1, K_NO_WAIT); + if (rc == -EIO) { + errno = EAGAIN; + return -1; + } + + } else { + while (count > 0 && atomic_get(&p_pipe_desc->write_opened) == 1) { + ssize_t read = 0; + + rc = k_pipe_get(&(p_pipe_desc->pipe), read_buf, count, &read, 1, K_NO_WAIT); + + if (rc != -EIO) { + k_sem_give(&p_pipe_desc->sync); + } + + if (read != count) { + k_sem_take(&p_pipe_desc->sync, K_FOREVER); + } + + read_buf += read; + count -= read; + total_read += read; + } + } + + return total_read; +} + +static ssize_t pipe_write_vmeth(void *obj, const void *buffer, size_t count) +{ + ssize_t total_written = 0; + int *ptr = (int *)obj; + unsigned char *write_buf = (unsigned char *)buffer; + int rc; + + struct pipe_desc *p_pipe_desc = + (struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write)); + + if (((*ptr & FS_O_WRITE) == 0) || atomic_get(&p_pipe_desc->write_opened) == 0) { + errno = EACCES; + return -1; + } + + if ((*ptr & O_NONBLOCK) == O_NONBLOCK) { + rc = k_pipe_put(&(p_pipe_desc->pipe), write_buf, count, &total_written, 1, + K_NO_WAIT); + if (rc == -EIO) { + errno = EAGAIN; + return -1; + } + } else { + while (count > 0 && atomic_get(&p_pipe_desc->read_opened) == 1) { + ssize_t written = 0; + + rc = k_pipe_put(&(p_pipe_desc->pipe), write_buf, count, &written, 1, + K_NO_WAIT); + + if (rc != -EIO) { + k_sem_give(&p_pipe_desc->sync); + } + + if (written != count) { + k_sem_take(&p_pipe_desc->sync, K_FOREVER); + } + + write_buf += written; + count -= written; + total_written += written; + } + } + + return total_written; +} + +static int pipe_close_vmeth(void *obj) +{ + int *ptr = (int *)obj; + struct pipe_desc *p_pipe_desc; + + if ((*ptr & FS_O_WRITE)) { + p_pipe_desc = + (struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write)); + atomic_clear(&p_pipe_desc->write_opened); + k_sem_give(&p_pipe_desc->sync); + } else if ((*ptr & FS_O_READ) == FS_O_READ) { + p_pipe_desc = + (struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read)); + atomic_clear(&p_pipe_desc->read_opened); + k_sem_give(&p_pipe_desc->sync); + } else { + errno = EINVAL; + return -1; + } + + if (atomic_get(&p_pipe_desc->read_opened) == 0 && + atomic_get(&p_pipe_desc->write_opened) == 0) { + k_pipe_flush(&p_pipe_desc->pipe); + atomic_clear(&p_pipe_desc->is_used); + } + + return 0; +} + +static int pipe_ioctl_vmeth(void *obj, unsigned int request, va_list args) +{ + int *ptr = (int *)obj; + struct pipe_desc *p_pipe_desc; + + if ((*ptr & FS_O_WRITE)) { + p_pipe_desc = + (struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write)); + } else if ((*ptr & FS_O_READ)) { + p_pipe_desc = + (struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read)); + } else { + errno = EINVAL; + return -1; + } + + int fd; + int flags; + int ret = 0; + + switch (request) { + case F_DUPFD: + fd = va_arg(args, int); + ret = zvfs_reserve_fd(); + if (ret == -1) { + errno = ENFILE; + return -1; + } + zvfs_finalize_fd(ret, obj, &fs_fd_op_vtable); + break; + + case F_GETFL: + if ((*ptr & (FS_O_READ | FS_O_WRITE)) == FS_O_READ) { + ret = O_RDONLY; + } else if ((*ptr & (FS_O_READ | FS_O_WRITE)) == FS_O_WRITE) { + ret = O_WRONLY; + } else { + errno = EINVAL; + ret = -1; + } + if (*ptr & O_NONBLOCK) { + ret |= O_NONBLOCK; + } + break; + + case F_SETFL: + flags = va_arg(args, int); + if (flags & O_NONBLOCK) { + (*ptr) = (*ptr) | O_NONBLOCK; + } else { + *ptr &= ~O_NONBLOCK; + } + ret = 0; + break; + + default: + errno = EINVAL; + ret = -1; + break; + } + + return ret; +} + +int pipe(int fildes[2]) +{ + int fd1, fd2, i; + + fd1 = zvfs_reserve_fd(); + if (fd1 == -1) { + errno = ENFILE; + return -1; + } + + fd2 = zvfs_reserve_fd(); + if (fd2 == -1) { + errno = ENFILE; + zvfs_free_fd(fd1); + return -1; + } + + fildes[0] = fd1; + fildes[1] = fd2; + + for (i = 0; i < CONFIG_POSIX_PIPES_MAX; i++) { + if (atomic_cas(&pipe_desc_array[i].is_used, 0, 1)) { + break; + } + } + + if (i == CONFIG_POSIX_PIPES_MAX) { + errno = EMFILE; + return -1; + } + + atomic_set(&pipe_desc_array[i].read_opened, 1); + atomic_set(&pipe_desc_array[i].write_opened, 1); + + k_pipe_init(&(pipe_desc_array[i].pipe), pipe_desc_array[i].ring_buffer, + sizeof(pipe_desc_array[i].ring_buffer)); + + pipe_desc_array[i].flags_read = FS_O_READ; + pipe_desc_array[i].flags_write = FS_O_WRITE; + + k_sem_init(&pipe_desc_array[i].sync, 0, 1); + + zvfs_finalize_fd(fd1, &(pipe_desc_array[i].flags_read), &fs_fd_op_vtable); + zvfs_finalize_fd(fd2, &(pipe_desc_array[i].flags_write), &fs_fd_op_vtable); + + return 0; +} diff --git a/tests/posix/common/prj.conf b/tests/posix/common/prj.conf index 94f671badcfb..3c4aa9d2a442 100644 --- a/tests/posix/common/prj.conf +++ b/tests/posix/common/prj.conf @@ -1,4 +1,5 @@ CONFIG_POSIX_API=y +CONFIG_POSIX_PIPE=y CONFIG_POSIX_THREAD_THREADS_MAX=6 CONFIG_ZTEST=y CONFIG_POSIX_SEM_VALUE_MAX=32767 diff --git a/tests/posix/common/src/pipe.c b/tests/posix/common/src/pipe.c new file mode 100644 index 000000000000..2e6a170f5718 --- /dev/null +++ b/tests/posix/common/src/pipe.c @@ -0,0 +1,130 @@ +/* + * Copyright (c) Bruce ROSIER + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include + +/* Test case: Create and close pipes */ +ZTEST(posix_pipe, test_create_close_pipe) +{ + int fds[2]; + int ret; + + ret = pipe(fds); + zassert_true(ret == 0, "pipe creation failed"); + + zassert_true(fds[0] >= 0, "read descriptor is invalid"); + zassert_true(fds[1] >= 0, "write descriptor is invalid"); + + close(fds[0]); + close(fds[1]); + + ret = close(fds[0]); + zassert_true(ret == -1 && errno == EBADF, "double close should fail"); +} + +/* Test case: Blocking read and write */ +ZTEST(posix_pipe, test_blocking_read_write) +{ + int fds[2]; + char write_data[] = "Hello, Zephyr!"; + char read_buffer[20]; + ssize_t n; + + zassert_true(pipe(fds) == 0, "pipe creation failed"); + + n = write(fds[1], write_data, sizeof(write_data)); + zassert_true(n == sizeof(write_data), "write failed"); + + n = read(fds[0], read_buffer, sizeof(write_data)); + zassert_true(n == sizeof(write_data), "read failed"); + + zassert_mem_equal(write_data, read_buffer, sizeof(write_data), "data mismatch"); + + close(fds[0]); + close(fds[1]); +} + +/* Test case: Non-blocking read/write with O_NONBLOCK */ +ZTEST(posix_pipe, test_nonblocking_read_write) +{ + int fds[2]; + char write_data[] = "Test non-blocking"; + char read_buffer[20]; + ssize_t n; + int flags; + + zassert_true(pipe(fds) == 0, "pipe creation failed"); + + flags = fcntl(fds[0], F_GETFL, 0); + fcntl(fds[0], F_SETFL, flags | O_NONBLOCK); + + n = read(fds[0], read_buffer, sizeof(read_buffer)); + zassert_true(n == -1 && errno == EAGAIN, "read from empty pipe should return EAGAIN"); + + n = write(fds[1], write_data, sizeof(write_data)); + zassert_true(n == sizeof(write_data), "non-blocking write failed"); + + n = read(fds[0], read_buffer, sizeof(write_data)); + zassert_true(n == sizeof(write_data), "non-blocking read failed"); + + close(fds[0]); + close(fds[1]); +} + +/* Test case: Pipe full scenario */ +ZTEST(posix_pipe, test_pipe_full) +{ + int fds[2]; + char write_data[_POSIX_PIPE_BUF + 1]; + ssize_t n; + + zassert_true(pipe(fds) == 0, "pipe creation failed"); + + memset(write_data, 'A', _POSIX_PIPE_BUF); + n = write(fds[1], write_data, _POSIX_PIPE_BUF); + zassert_true(n == _POSIX_PIPE_BUF, "write to pipe failed"); + + fcntl(fds[1], F_SETFL, O_NONBLOCK); + n = write(fds[1], write_data, 1); + zassert_true(n == -1 && errno == EAGAIN, "pipe should be full"); + + close(fds[0]); + close(fds[1]); +} + +/* Test case: Pipe multiple readers and writers */ +ZTEST(posix_pipe, test_multiple_readers_writers) +{ + int fds[2]; + char write_data1[] = "Writer 1"; + char write_data2[] = "Writer 2"; + char read_buffer[20]; + ssize_t n; + + zassert_true(pipe(fds) == 0, "pipe creation failed"); + + n = write(fds[1], write_data1, sizeof(write_data1)); + zassert_true(n == sizeof(write_data1), "write 1 failed"); + + n = write(fds[1], write_data2, sizeof(write_data2)); + zassert_true(n == sizeof(write_data2), "write 2 failed"); + + n = read(fds[0], read_buffer, sizeof(write_data1)); + zassert_true(n == sizeof(write_data1), "read 1 failed"); + zassert_mem_equal(write_data1, read_buffer, sizeof(write_data1), "data 1 mismatch"); + + n = read(fds[0], read_buffer, sizeof(write_data2)); + zassert_true(n == sizeof(write_data2), "read 2 failed"); + zassert_mem_equal(write_data2, read_buffer, sizeof(write_data2), "data 2 mismatch"); + + close(fds[0]); + close(fds[1]); +} + +ZTEST_SUITE(posix_pipe, NULL, NULL, NULL, NULL, NULL);