Skip to content

Commit 574df27

Browse files
committed
posix: pipe: Implement POSIX-compliant pipes
This change introduces a POSIX-compliant pipe feature into Zephyr RTOS. The POSIX_PIPE is required for PSE53 conformance. The pipe implementation includes non-blocking and blocking behavior for pipe read/write operations, adhering to the POSIX standard. The pipe implementation includes atomic synchronization mechanisms to handle multiple readers and writers, ensuring thread safety. Tested using ZTEST framework, validating both blocking and non-blocking behaviors with various pipe buffer sizes. Test cases include handling full and empty pipes, ensuring correct behavior when multiple threads attempt simultaneous reads and writes. Signed-off-by: Bruce Rosier <[email protected]>
1 parent 091c666 commit 574df27

File tree

8 files changed

+436
-1
lines changed

8 files changed

+436
-1
lines changed

include/zephyr/posix/unistd.h

+4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ int rmdir(const char *path);
4848

4949
FUNC_NORETURN void _exit(int status);
5050

51+
#ifdef CONFIG_POSIX_PIPE
52+
int pipe(int fildes[2]);
53+
#endif
54+
5155
#ifdef CONFIG_NETWORKING
5256
static inline int gethostname(char *buf, size_t len)
5357
{

lib/posix/options/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ if (NOT CONFIG_TC_PROVIDES_POSIX_TIMERS)
126126
endif()
127127

128128
zephyr_library_sources_ifdef(CONFIG_POSIX_PRIORITY_SCHEDULING sched.c)
129+
zephyr_library_sources_ifdef(CONFIG_POSIX_PIPE pipe.c)
129130

130131
if (NOT CONFIG_TC_PROVIDES_POSIX_READER_WRITER_LOCKS)
131132
# Note: the Option is _POSIX_READER_WRITER_LOCKS, while the Option Group is POSIX_RW_LOCKS.

lib/posix/options/Kconfig

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ rsource "Kconfig.fs"
1818
rsource "Kconfig.mem"
1919
rsource "Kconfig.mqueue"
2020
rsource "Kconfig.net"
21+
rsource "Kconfig.pipe"
2122
rsource "Kconfig.proc1"
2223
rsource "Kconfig.procN"
2324
rsource "Kconfig.pthread"

lib/posix/options/Kconfig.pipe

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright (c) 2024, Bruce Rosier
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
menu "POSIX Pipe Options"
6+
7+
config POSIX_PIPE
8+
bool"POSIX pipe support"
9+
default y
10+
select PIPES
11+
help
12+
For more information, please see
13+
https://pubs.opengroup.org/onlinepubs/9699919799/xrat/V4_subprofiles.html
14+
15+
if POSIX_PIPE
16+
17+
config POSIX_PIPES_MAX
18+
int "Number of available pipes"
19+
default 2
20+
help
21+
Specify the number of POSIX pipes available in the system.
22+
This value controls how many pipes can be created concurrently.
23+
24+
config POSIX_PIPE_BUF
25+
int "Size of pipe buffer"
26+
default 512
27+
help
28+
Specify the size of the buffe (in bytes) used by the pipe function.
29+
30+
endif
31+
32+
endmenu

lib/posix/options/Kconfig.profile

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ config POSIX_AEP_REALTIME_DEDICATED
169169
# Option Groups
170170
select POSIX_MULTI_PROCESS
171171
select POSIX_NETWORKING
172-
# select POSIX_PIPE
172+
select POSIX_PIPE
173173
# select POSIX_SIGNAL_JUMP
174174
# Options
175175
select POSIX_CPUTIME

lib/posix/options/pipe.c

+266
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright (c) Bruce ROSIER
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#include <zephyr/posix/fcntl.h>
8+
#include <zephyr/sys/fdtable.h>
9+
#include <zephyr/fs/fs.h>
10+
11+
static ssize_t pipe_read_vmeth(void *obj, void *buffer, size_t count);
12+
static ssize_t pipe_write_vmeth(void *obj, const void *buffer, size_t count);
13+
static int pipe_close_vmeth(void *obj);
14+
static int pipe_ioctl_vmeth(void *obj, unsigned int request, va_list args);
15+
16+
struct pipe_desc {
17+
unsigned char __aligned(4) ring_buffer[CONFIG_POSIX_PIPE_BUF];
18+
struct k_pipe pipe;
19+
int flags_read;
20+
int flags_write;
21+
atomic_t read_opened;
22+
atomic_t write_opened;
23+
atomic_t is_used;
24+
struct k_sem sync;
25+
};
26+
27+
static struct pipe_desc pipe_desc_array[CONFIG_POSIX_PIPES_MAX];
28+
29+
static struct fd_op_vtable fs_fd_op_vtable = {
30+
.read = pipe_read_vmeth,
31+
.write = pipe_write_vmeth,
32+
.close = pipe_close_vmeth,
33+
.ioctl = pipe_ioctl_vmeth,
34+
};
35+
36+
static ssize_t pipe_read_vmeth(void *obj, void *buffer, size_t count)
37+
{
38+
ssize_t total_read = 0;
39+
int *ptr = (int *)obj;
40+
unsigned char *read_buf = (unsigned char *)buffer;
41+
int rc;
42+
43+
struct pipe_desc *p_pipe_desc =
44+
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read));
45+
46+
if (((*ptr & FS_O_READ) == 0) || atomic_get(&p_pipe_desc->read_opened) == 0) {
47+
errno = EACCES;
48+
return -1;
49+
}
50+
51+
if ((*ptr & O_NONBLOCK) == O_NONBLOCK) {
52+
rc = k_pipe_get(&(p_pipe_desc->pipe), read_buf, count, &total_read, 1, K_NO_WAIT);
53+
if (rc == -EIO) {
54+
errno = EAGAIN;
55+
return -1;
56+
}
57+
58+
} else {
59+
while (count > 0 && atomic_get(&p_pipe_desc->write_opened) == 1) {
60+
ssize_t read = 0;
61+
62+
rc = k_pipe_get(&(p_pipe_desc->pipe), read_buf, count, &read, 1, K_NO_WAIT);
63+
64+
if (rc != -EIO) {
65+
k_sem_give(&p_pipe_desc->sync);
66+
}
67+
68+
if (read != count) {
69+
k_sem_take(&p_pipe_desc->sync, K_FOREVER);
70+
}
71+
72+
read_buf += read;
73+
count -= read;
74+
total_read += read;
75+
}
76+
}
77+
78+
return total_read;
79+
}
80+
81+
static ssize_t pipe_write_vmeth(void *obj, const void *buffer, size_t count)
82+
{
83+
ssize_t total_written = 0;
84+
int *ptr = (int *)obj;
85+
unsigned char *write_buf = (unsigned char *)buffer;
86+
int rc;
87+
88+
struct pipe_desc *p_pipe_desc =
89+
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write));
90+
91+
if (((*ptr & FS_O_WRITE) == 0) || atomic_get(&p_pipe_desc->write_opened) == 0) {
92+
errno = EACCES;
93+
return -1;
94+
}
95+
96+
if ((*ptr & O_NONBLOCK) == O_NONBLOCK) {
97+
rc = k_pipe_put(&(p_pipe_desc->pipe), write_buf, count, &total_written, 1,
98+
K_NO_WAIT);
99+
if (rc == -EIO) {
100+
errno = EAGAIN;
101+
return -1;
102+
}
103+
} else {
104+
while (count > 0 && atomic_get(&p_pipe_desc->read_opened) == 1) {
105+
ssize_t written = 0;
106+
107+
rc = k_pipe_put(&(p_pipe_desc->pipe), write_buf, count, &written, 1,
108+
K_NO_WAIT);
109+
110+
if (rc != -EIO) {
111+
k_sem_give(&p_pipe_desc->sync);
112+
}
113+
114+
if (written != count) {
115+
k_sem_take(&p_pipe_desc->sync, K_FOREVER);
116+
}
117+
118+
write_buf += written;
119+
count -= written;
120+
total_written += written;
121+
}
122+
}
123+
124+
return total_written;
125+
}
126+
127+
static int pipe_close_vmeth(void *obj)
128+
{
129+
int *ptr = (int *)obj;
130+
struct pipe_desc *p_pipe_desc;
131+
132+
if ((*ptr & FS_O_WRITE)) {
133+
p_pipe_desc =
134+
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write));
135+
atomic_clear(&p_pipe_desc->write_opened);
136+
k_sem_give(&p_pipe_desc->sync);
137+
} else if ((*ptr & FS_O_READ) == FS_O_READ) {
138+
p_pipe_desc =
139+
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read));
140+
atomic_clear(&p_pipe_desc->read_opened);
141+
k_sem_give(&p_pipe_desc->sync);
142+
} else {
143+
errno = EINVAL;
144+
return -1;
145+
}
146+
147+
if (atomic_get(&p_pipe_desc->read_opened) == 0 &&
148+
atomic_get(&p_pipe_desc->write_opened) == 0) {
149+
k_pipe_flush(&p_pipe_desc->pipe);
150+
atomic_clear(&p_pipe_desc->is_used);
151+
}
152+
153+
return 0;
154+
}
155+
156+
static int pipe_ioctl_vmeth(void *obj, unsigned int request, va_list args)
157+
{
158+
int *ptr = (int *)obj;
159+
struct pipe_desc *p_pipe_desc;
160+
161+
if ((*ptr & FS_O_WRITE)) {
162+
p_pipe_desc =
163+
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_write));
164+
} else if ((*ptr & FS_O_READ)) {
165+
p_pipe_desc =
166+
(struct pipe_desc *)((char *)ptr - offsetof(struct pipe_desc, flags_read));
167+
} else {
168+
errno = EINVAL;
169+
return -1;
170+
}
171+
172+
int fd;
173+
int flags;
174+
int ret = 0;
175+
176+
switch (request) {
177+
case F_DUPFD:
178+
fd = va_arg(args, int);
179+
ret = zvfs_reserve_fd();
180+
if (ret == -1) {
181+
errno = ENFILE;
182+
return -1;
183+
}
184+
zvfs_finalize_fd(ret, obj, &fs_fd_op_vtable);
185+
break;
186+
187+
case F_GETFL:
188+
if ((*ptr & (FS_O_READ | FS_O_WRITE)) == FS_O_READ) {
189+
ret = O_RDONLY;
190+
} else if ((*ptr & (FS_O_READ | FS_O_WRITE)) == FS_O_WRITE) {
191+
ret = O_WRONLY;
192+
} else {
193+
errno = EINVAL;
194+
ret = -1;
195+
}
196+
if (*ptr & O_NONBLOCK) {
197+
ret |= O_NONBLOCK;
198+
}
199+
break;
200+
201+
case F_SETFL:
202+
flags = va_arg(args, int);
203+
if (flags & O_NONBLOCK) {
204+
(*ptr) = (*ptr) | O_NONBLOCK;
205+
} else {
206+
*ptr &= ~O_NONBLOCK;
207+
}
208+
ret = 0;
209+
break;
210+
211+
default:
212+
errno = EINVAL;
213+
ret = -1;
214+
break;
215+
}
216+
217+
return ret;
218+
}
219+
220+
int pipe(int fildes[2])
221+
{
222+
int fd1, fd2, i;
223+
224+
fd1 = zvfs_reserve_fd();
225+
if (fd1 == -1) {
226+
errno = ENFILE;
227+
return -1;
228+
}
229+
230+
fd2 = zvfs_reserve_fd();
231+
if (fd2 == -1) {
232+
errno = ENFILE;
233+
zvfs_free_fd(fd1);
234+
return -1;
235+
}
236+
237+
fildes[0] = fd1;
238+
fildes[1] = fd2;
239+
240+
for (i = 0; i < CONFIG_POSIX_PIPES_MAX; i++) {
241+
if (atomic_cas(&pipe_desc_array[i].is_used, 0, 1)) {
242+
break;
243+
}
244+
}
245+
246+
if (i == CONFIG_POSIX_PIPES_MAX) {
247+
errno = EMFILE;
248+
return -1;
249+
}
250+
251+
atomic_set(&pipe_desc_array[i].read_opened, 1);
252+
atomic_set(&pipe_desc_array[i].write_opened, 1);
253+
254+
k_pipe_init(&(pipe_desc_array[i].pipe), pipe_desc_array[i].ring_buffer,
255+
sizeof(pipe_desc_array[i].ring_buffer));
256+
257+
pipe_desc_array[i].flags_read = FS_O_READ;
258+
pipe_desc_array[i].flags_write = FS_O_WRITE;
259+
260+
k_sem_init(&pipe_desc_array[i].sync, 0, 1);
261+
262+
zvfs_finalize_fd(fd1, &(pipe_desc_array[i].flags_read), &fs_fd_op_vtable);
263+
zvfs_finalize_fd(fd2, &(pipe_desc_array[i].flags_write), &fs_fd_op_vtable);
264+
265+
return 0;
266+
}

tests/posix/common/prj.conf

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
CONFIG_POSIX_API=y
2+
CONFIG_POSIX_PIPE=y
23
CONFIG_POSIX_THREAD_THREADS_MAX=6
34
CONFIG_ZTEST=y
45
CONFIG_POSIX_SEM_VALUE_MAX=32767

0 commit comments

Comments
 (0)