Skip to content

Commit d8901d2

Browse files
spokeydokeysstainless-app[bot]
authored andcommitted
fix(asyncify): avoid hanging process under certain conditions (#1853)
1 parent 0d6185e commit d8901d2

File tree

5 files changed

+88
-52
lines changed

5 files changed

+88
-52
lines changed

Diff for: pyproject.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ dev-dependencies = [
6565
"azure-identity >=1.14.1",
6666
"types-tqdm > 4",
6767
"types-pyaudio > 0",
68-
"trio >=0.22.2"
68+
"trio >=0.22.2",
69+
"nest_asyncio==1.6.0"
70+
6971
]
7072

7173
[tool.rye.scripts]

Diff for: requirements-dev.lock

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# all-features: true
88
# with-sources: false
99
# generate-hashes: false
10+
# universal: false
1011

1112
-e file:.
1213
annotated-types==0.6.0
@@ -87,6 +88,7 @@ mypy==1.13.0
8788
mypy-extensions==1.0.0
8889
# via black
8990
# via mypy
91+
nest-asyncio==1.6.0
9092
nodeenv==1.8.0
9193
# via pyright
9294
nox==2023.4.22

Diff for: requirements.lock

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# all-features: true
88
# with-sources: false
99
# generate-hashes: false
10+
# universal: false
1011

1112
-e file:.
1213
annotated-types==0.6.0

Diff for: src/openai/_utils/_sync.py

+39-51
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,60 @@
11
from __future__ import annotations
22

3+
import sys
4+
import asyncio
35
import functools
4-
from typing import TypeVar, Callable, Awaitable
6+
import contextvars
7+
from typing import Any, TypeVar, Callable, Awaitable
58
from typing_extensions import ParamSpec
69

7-
import anyio
8-
import anyio.to_thread
9-
10-
from ._reflection import function_has_argument
11-
1210
T_Retval = TypeVar("T_Retval")
1311
T_ParamSpec = ParamSpec("T_ParamSpec")
1412

1513

16-
# copied from `asyncer`, https://github.com/tiangolo/asyncer
17-
def asyncify(
18-
function: Callable[T_ParamSpec, T_Retval],
19-
*,
20-
cancellable: bool = False,
21-
limiter: anyio.CapacityLimiter | None = None,
22-
) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
23-
"""
24-
Take a blocking function and create an async one that receives the same
25-
positional and keyword arguments, and that when called, calls the original function
26-
in a worker thread using `anyio.to_thread.run_sync()`. Internally,
27-
`asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports
28-
keyword arguments additional to positional arguments and it adds better support for
29-
autocompletion and inline errors for the arguments of the function called and the
30-
return value.
14+
if sys.version_info >= (3, 9):
15+
to_thread = asyncio.to_thread
16+
else:
17+
async def _to_thread(
18+
func: Callable[T_ParamSpec, T_Retval], /, *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
19+
) -> Any:
20+
"""Asynchronously run function *func* in a separate thread.
3121
32-
If the `cancellable` option is enabled and the task waiting for its completion is
33-
cancelled, the thread will still run its course but its return value (or any raised
34-
exception) will be ignored.
22+
Any *args and **kwargs supplied for this function are directly passed
23+
to *func*. Also, the current :class:`contextvars.Context` is propagated,
24+
allowing context variables from the main thread to be accessed in the
25+
separate thread.
3526
36-
Use it like this:
27+
Returns a coroutine that can be awaited to get the eventual result of *func*.
28+
"""
29+
loop = asyncio.events.get_running_loop()
30+
ctx = contextvars.copy_context()
31+
func_call = functools.partial(ctx.run, func, *args, **kwargs)
32+
return await loop.run_in_executor(None, func_call)
33+
34+
to_thread = _to_thread
35+
36+
# inspired by `asyncer`, https://github.com/tiangolo/asyncer
37+
def asyncify(function: Callable[T_ParamSpec, T_Retval]) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
38+
"""
39+
Take a blocking function and create an async one that receives the same
40+
positional and keyword arguments. For python version 3.9 and above, it uses
41+
asyncio.to_thread to run the function in a separate thread. For python version
42+
3.8, it uses locally defined copy of the asyncio.to_thread function which was
43+
introduced in python 3.9.
3744
38-
```Python
39-
def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
40-
# Do work
41-
return "Some result"
45+
Usage:
4246
47+
```python
48+
def blocking_func(arg1, arg2, kwarg1=None):
49+
# blocking code
50+
return result
4351
44-
result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
45-
print(result)
52+
result = asyncify(blocking_function)(arg1, arg2, kwarg1=value1)
4653
```
4754
4855
## Arguments
4956
5057
`function`: a blocking regular callable (e.g. a function)
51-
`cancellable`: `True` to allow cancellation of the operation
52-
`limiter`: capacity limiter to use to limit the total amount of threads running
53-
(if omitted, the default limiter is used)
5458
5559
## Return
5660
@@ -60,22 +64,6 @@ def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
6064
"""
6165

6266
async def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval:
63-
partial_f = functools.partial(function, *args, **kwargs)
64-
65-
# In `v4.1.0` anyio added the `abandon_on_cancel` argument and deprecated the old
66-
# `cancellable` argument, so we need to use the new `abandon_on_cancel` to avoid
67-
# surfacing deprecation warnings.
68-
if function_has_argument(anyio.to_thread.run_sync, "abandon_on_cancel"):
69-
return await anyio.to_thread.run_sync(
70-
partial_f,
71-
abandon_on_cancel=cancellable,
72-
limiter=limiter,
73-
)
74-
75-
return await anyio.to_thread.run_sync(
76-
partial_f,
77-
cancellable=cancellable,
78-
limiter=limiter,
79-
)
67+
return await to_thread(function, *args, **kwargs)
8068

8169
return wrapper

Diff for: tests/test_client.py

+43
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44

55
import gc
66
import os
7+
import sys
78
import json
89
import asyncio
910
import inspect
11+
import subprocess
1012
import tracemalloc
1113
from typing import Any, Union, cast
14+
from textwrap import dedent
1215
from unittest import mock
1316
from typing_extensions import Literal
1417

@@ -1766,3 +1769,43 @@ def retry_handler(_request: httpx.Request) -> httpx.Response:
17661769
) as response:
17671770
assert response.retries_taken == failures_before_success
17681771
assert int(response.http_request.headers.get("x-stainless-retry-count")) == failures_before_success
1772+
1773+
def test_get_platform(self) -> None:
1774+
# Issue https://github.com/openai/openai-python/issues/1827 was caused
1775+
# asyncify leaving threads unterminated when used with nest_asyncio.
1776+
# Since nest_asyncio.apply() is global and cannot be un-applied, this
1777+
# test is run in a separate process to avoid affecting other tests.
1778+
test_code = dedent("""\
1779+
import asyncio
1780+
import nest_asyncio
1781+
1782+
import threading
1783+
1784+
from openai._base_client import get_platform
1785+
from openai._utils import asyncify
1786+
1787+
async def test_main() -> None:
1788+
result = await asyncify(get_platform)()
1789+
print(result)
1790+
for thread in threading.enumerate():
1791+
print(thread.name)
1792+
1793+
nest_asyncio.apply()
1794+
asyncio.run(test_main())
1795+
""")
1796+
with subprocess.Popen(
1797+
[sys.executable, "-c", test_code],
1798+
stdout=subprocess.PIPE,
1799+
stderr=subprocess.PIPE,
1800+
text=True,
1801+
) as process:
1802+
try:
1803+
process.wait(2)
1804+
if process.returncode:
1805+
print(process.stdout)
1806+
print(process.stderr)
1807+
raise AssertionError("calling get_platform using asyncify resulted in a non-zero exit code")
1808+
except subprocess.TimeoutExpired as e:
1809+
process.kill()
1810+
raise AssertionError("calling get_platform using asyncify resulted in a hung process") from e
1811+

0 commit comments

Comments
 (0)