Skip to content

posix: pipe: Implement POSIX-compliant pipes #79393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/zephyr/posix/unistd.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ int rmdir(const char *path);

FUNC_NORETURN void _exit(int status);

#ifdef CONFIG_POSIX_PIPE
int pipe(int fildes[2]);
#endif

#ifdef CONFIG_NETWORKING
static inline int gethostname(char *buf, size_t len)
{
Expand Down
1 change: 1 addition & 0 deletions lib/posix/options/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ if (NOT CONFIG_TC_PROVIDES_POSIX_TIMERS)
endif()

zephyr_library_sources_ifdef(CONFIG_POSIX_PRIORITY_SCHEDULING sched.c)
zephyr_library_sources_ifdef(CONFIG_POSIX_PIPE pipe.c)

if (NOT CONFIG_TC_PROVIDES_POSIX_READER_WRITER_LOCKS)
# Note: the Option is _POSIX_READER_WRITER_LOCKS, while the Option Group is POSIX_RW_LOCKS.
Expand Down
1 change: 1 addition & 0 deletions lib/posix/options/Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rsource "Kconfig.fs"
rsource "Kconfig.mem"
rsource "Kconfig.mqueue"
rsource "Kconfig.net"
rsource "Kconfig.pipe"
rsource "Kconfig.proc1"
rsource "Kconfig.procN"
rsource "Kconfig.pthread"
Expand Down
32 changes: 32 additions & 0 deletions lib/posix/options/Kconfig.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) 2024, Bruce Rosier
#
# SPDX-License-Identifier: Apache-2.0

menu "POSIX Pipe Options"

config POSIX_PIPE
bool"POSIX pipe support"
default y
select PIPES
help
For more information, please see
https://pubs.opengroup.org/onlinepubs/9699919799/xrat/V4_subprofiles.html

if POSIX_PIPE

config POSIX_PIPES_MAX
int "Number of available pipes"
default 2
help
Specify the number of POSIX pipes available in the system.
This value controls how many pipes can be created concurrently.

config POSIX_PIPE_BUF
int "Size of pipe buffer"
default 512
help
Specify the size of the buffe (in bytes) used by the pipe function.

endif

endmenu
2 changes: 1 addition & 1 deletion lib/posix/options/Kconfig.profile
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ config POSIX_AEP_REALTIME_DEDICATED
# Option Groups
select POSIX_MULTI_PROCESS
select POSIX_NETWORKING
# select POSIX_PIPE
select POSIX_PIPE
# select POSIX_SIGNAL_JUMP
# Options
select POSIX_CPUTIME
Expand Down
266 changes: 266 additions & 0 deletions lib/posix/options/pipe.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Copyright (c) Bruce ROSIER
*
* SPDX-License-Identifier: Apache-2.0
*/

#include <zephyr/posix/fcntl.h>
#include <zephyr/sys/fdtable.h>
#include <zephyr/fs/fs.h>

static ssize_t pipe_read_vmeth(void *obj, void *buffer, size_t count);
static ssize_t pipe_write_vmeth(void *obj, const void *buffer, size_t count);
static int pipe_close_vmeth(void *obj);
static int pipe_ioctl_vmeth(void *obj, unsigned int request, va_list args);

struct pipe_desc {
unsigned char __aligned(4) ring_buffer[CONFIG_POSIX_PIPE_BUF];
struct k_pipe pipe;
int flags_read;
int flags_write;
atomic_t read_opened;
atomic_t write_opened;
atomic_t is_used;
struct k_sem sync;
};

static struct pipe_desc pipe_desc_array[CONFIG_POSIX_PIPES_MAX];

static struct fd_op_vtable fs_fd_op_vtable = {
.read = pipe_read_vmeth,
.write = pipe_write_vmeth,
.close = pipe_close_vmeth,
.ioctl = pipe_ioctl_vmeth,
};

static ssize_t pipe_read_vmeth(void *obj, void *buffer, size_t count)
{
ssize_t total_read = 0;
int *ptr = (int *)obj;
unsigned char *read_buf = (unsigned char *)buffer;
int rc;

struct pipe_desc *p_pipe_desc =
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read));

if (((*ptr & FS_O_READ) == 0) || atomic_get(&p_pipe_desc->read_opened) == 0) {
errno = EACCES;
return -1;
}

if ((*ptr & O_NONBLOCK) == O_NONBLOCK) {
rc = k_pipe_get(&(p_pipe_desc->pipe), read_buf, count, &total_read, 1, K_NO_WAIT);
if (rc == -EIO) {
errno = EAGAIN;
return -1;
}

} else {
while (count > 0 && atomic_get(&p_pipe_desc->write_opened) == 1) {
ssize_t read = 0;

rc = k_pipe_get(&(p_pipe_desc->pipe), read_buf, count, &read, 1, K_NO_WAIT);

if (rc != -EIO) {
k_sem_give(&p_pipe_desc->sync);
}

if (read != count) {
k_sem_take(&p_pipe_desc->sync, K_FOREVER);
}

read_buf += read;
count -= read;
total_read += read;
}
}

return total_read;
}

static ssize_t pipe_write_vmeth(void *obj, const void *buffer, size_t count)
{
ssize_t total_written = 0;
int *ptr = (int *)obj;
unsigned char *write_buf = (unsigned char *)buffer;
int rc;

struct pipe_desc *p_pipe_desc =
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write));

if (((*ptr & FS_O_WRITE) == 0) || atomic_get(&p_pipe_desc->write_opened) == 0) {
errno = EACCES;
return -1;
}

if ((*ptr & O_NONBLOCK) == O_NONBLOCK) {
rc = k_pipe_put(&(p_pipe_desc->pipe), write_buf, count, &total_written, 1,
K_NO_WAIT);
if (rc == -EIO) {
errno = EAGAIN;
return -1;
}
} else {
while (count > 0 && atomic_get(&p_pipe_desc->read_opened) == 1) {
ssize_t written = 0;

rc = k_pipe_put(&(p_pipe_desc->pipe), write_buf, count, &written, 1,
K_NO_WAIT);

if (rc != -EIO) {
k_sem_give(&p_pipe_desc->sync);
}

if (written != count) {
k_sem_take(&p_pipe_desc->sync, K_FOREVER);
}

write_buf += written;
count -= written;
total_written += written;
}
}

return total_written;
}

static int pipe_close_vmeth(void *obj)
{
int *ptr = (int *)obj;
struct pipe_desc *p_pipe_desc;

if ((*ptr & FS_O_WRITE)) {
p_pipe_desc =
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write));
atomic_clear(&p_pipe_desc->write_opened);
k_sem_give(&p_pipe_desc->sync);
} else if ((*ptr & FS_O_READ) == FS_O_READ) {
p_pipe_desc =
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read));
atomic_clear(&p_pipe_desc->read_opened);
k_sem_give(&p_pipe_desc->sync);
} else {
errno = EINVAL;
return -1;
}

if (atomic_get(&p_pipe_desc->read_opened) == 0 &&
atomic_get(&p_pipe_desc->write_opened) == 0) {
k_pipe_flush(&p_pipe_desc->pipe);
atomic_clear(&p_pipe_desc->is_used);
}

return 0;
}

static int pipe_ioctl_vmeth(void *obj, unsigned int request, va_list args)
{
int *ptr = (int *)obj;
struct pipe_desc *p_pipe_desc;

if ((*ptr & FS_O_WRITE)) {
p_pipe_desc =
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write));
} else if ((*ptr & FS_O_READ)) {
p_pipe_desc =
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read));
} else {
errno = EINVAL;
return -1;
}

int fd;
int flags;
int ret = 0;

switch (request) {
case F_DUPFD:
fd = va_arg(args, int);
ret = zvfs_reserve_fd();
if (ret == -1) {
errno = ENFILE;
return -1;
}
zvfs_finalize_fd(ret, obj, &fs_fd_op_vtable);
break;

case F_GETFL:
if ((*ptr & (FS_O_READ | FS_O_WRITE)) == FS_O_READ) {
ret = O_RDONLY;
} else if ((*ptr & (FS_O_READ | FS_O_WRITE)) == FS_O_WRITE) {
ret = O_WRONLY;
} else {
errno = EINVAL;
ret = -1;
}
if (*ptr & O_NONBLOCK) {
ret |= O_NONBLOCK;
}
break;

case F_SETFL:
flags = va_arg(args, int);
if (flags & O_NONBLOCK) {
(*ptr) = (*ptr) | O_NONBLOCK;
} else {
*ptr &= ~O_NONBLOCK;
}
ret = 0;
break;

default:
errno = EINVAL;
ret = -1;
break;
}

return ret;
}

int pipe(int fildes[2])
{
int fd1, fd2, i;

fd1 = zvfs_reserve_fd();
if (fd1 == -1) {
errno = ENFILE;
return -1;
}

fd2 = zvfs_reserve_fd();
if (fd2 == -1) {
errno = ENFILE;
zvfs_free_fd(fd1);
return -1;
}

fildes[0] = fd1;
fildes[1] = fd2;

for (i = 0; i < CONFIG_POSIX_PIPES_MAX; i++) {
if (atomic_cas(&pipe_desc_array[i].is_used, 0, 1)) {
break;
}
}

if (i == CONFIG_POSIX_PIPES_MAX) {
errno = EMFILE;
return -1;
}

atomic_set(&pipe_desc_array[i].read_opened, 1);
atomic_set(&pipe_desc_array[i].write_opened, 1);

k_pipe_init(&(pipe_desc_array[i].pipe), pipe_desc_array[i].ring_buffer,
sizeof(pipe_desc_array[i].ring_buffer));

pipe_desc_array[i].flags_read = FS_O_READ;
pipe_desc_array[i].flags_write = FS_O_WRITE;

k_sem_init(&pipe_desc_array[i].sync, 0, 1);

zvfs_finalize_fd(fd1, &(pipe_desc_array[i].flags_read), &fs_fd_op_vtable);
zvfs_finalize_fd(fd2, &(pipe_desc_array[i].flags_write), &fs_fd_op_vtable);

return 0;
}
1 change: 1 addition & 0 deletions tests/posix/common/prj.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
CONFIG_POSIX_API=y
CONFIG_POSIX_PIPE=y
CONFIG_POSIX_THREAD_THREADS_MAX=6
CONFIG_ZTEST=y
CONFIG_POSIX_SEM_VALUE_MAX=32767
Expand Down
Loading
Loading