@@ -147,6 +147,10 @@ cdef class ReadUnixTransport(UVStream):
147
147
@cython.no_gc_clear
148
148
cdef class WriteUnixTransport(UVStream):
149
149
150
+ def __cinit__ (self ):
151
+ self .disconnect_listener_inited = False
152
+ self .disconnect_listener.data = NULL
153
+
150
154
@staticmethod
151
155
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
152
156
object waiter):
@@ -163,6 +167,46 @@ cdef class WriteUnixTransport(UVStream):
163
167
__pipe_init_uv_handle(< UVStream> handle, loop)
164
168
return handle
165
169
170
+ cdef _start_reading(self ):
171
+ # A custom implementation for monitoring for EOF:
172
+ # libuv since v1.23.1 prohibits using uv_read_start on
173
+ # write-only FDs, so we use a throw-away uv_poll_t handle
174
+ # for that purpose, as suggested in
175
+ # https://github.com/libuv/libuv/issues/2058.
176
+
177
+ cdef int err
178
+
179
+ if not self .disconnect_listener_inited:
180
+ err = uv.uv_poll_init(self ._loop.uvloop,
181
+ & self .disconnect_listener,
182
+ self ._fileno())
183
+ if err < 0 :
184
+ raise convert_error(err)
185
+ self .disconnect_listener.data = < void * > self
186
+ self .disconnect_listener_inited = True
187
+
188
+ err = uv.uv_poll_start(& self .disconnect_listener,
189
+ uv.UV_READABLE | uv.UV_DISCONNECT,
190
+ __on_write_pipe_poll_event)
191
+ if err < 0 :
192
+ raise convert_error(err)
193
+
194
+ cdef _stop_reading(self ):
195
+ cdef int err
196
+ if not self .disconnect_listener_inited:
197
+ return
198
+ err = uv.uv_poll_stop(& self .disconnect_listener)
199
+ if err < 0 :
200
+ raise convert_error(err)
201
+
202
+ cdef _close(self ):
203
+ if self .disconnect_listener_inited:
204
+ self .disconnect_listener.data = NULL
205
+ uv.uv_close(< uv.uv_handle_t * > (& self .disconnect_listener), NULL )
206
+ self .disconnect_listener_inited = False
207
+
208
+ UVStream._close(self )
209
+
166
210
cdef _new_socket(self ):
167
211
return __pipe_get_socket(< UVSocketHandle> self )
168
212
@@ -176,6 +220,25 @@ cdef class WriteUnixTransport(UVStream):
176
220
raise NotImplementedError
177
221
178
222
223
+ cdef void __on_write_pipe_poll_event(uv.uv_poll_t* handle,
224
+ int status, int events) with gil:
225
+ cdef WriteUnixTransport tr
226
+
227
+ if handle.data is NULL :
228
+ return
229
+
230
+ tr = < WriteUnixTransport> handle.data
231
+ if tr._closed:
232
+ return
233
+
234
+ if events & uv.UV_DISCONNECT:
235
+ try :
236
+ tr._stop_reading()
237
+ tr._on_eof()
238
+ except BaseException as ex:
239
+ tr._fatal_error(ex, False )
240
+
241
+
179
242
cdef class _PipeConnectRequest(UVRequest):
180
243
cdef:
181
244
UnixTransport transport
0 commit comments