Skip to content

Commit 7835118

Browse files
Asynchronous CLI methods in CliApp (#533)
Co-authored-by: Hasan Ramezani <[email protected]>
1 parent 537f751 commit 7835118

File tree

3 files changed

+135
-4
lines changed

3 files changed

+135
-4
lines changed

docs/index.md

+66
Original file line numberDiff line numberDiff line change
@@ -1113,6 +1113,72 @@ For `BaseModel` and `pydantic.dataclasses.dataclass` types, `CliApp.run` will in
11131113
* `cli_implicit_flags=True`
11141114
* `cli_kebab_case=True`
11151115

1116+
### Asynchronous CLI Commands
1117+
1118+
Pydantic settings supports running asynchronous CLI commands via `CliApp.run` and `CliApp.run_subcommand`. With this feature, you can define async def methods within your Pydantic models (including subcommands) and have them executed just like their synchronous counterparts. Specifically:
1119+
1120+
1. Asynchronous methods are supported: You can now mark your cli_cmd or similar CLI entrypoint methods as async def and have CliApp execute them.
1121+
2. Subcommands may also be asynchronous: If you have nested CLI subcommands, the final (lowest-level) subcommand methods can likewise be asynchronous.
1122+
3. Limit asynchronous methods to final subcommands: Defining parent commands as asynchronous is not recommended, because it can result in additional threads and event loops being created. For best performance and to avoid unnecessary resource usage, only implement your deepest (child) subcommands as async def.
1123+
1124+
Below is a simple example demonstrating an asynchronous top-level command:
1125+
1126+
```py
1127+
from pydantic_settings import BaseSettings, CliApp
1128+
1129+
1130+
class AsyncSettings(BaseSettings):
1131+
async def cli_cmd(self) -> None:
1132+
print('Hello from an async CLI method!')
1133+
#> Hello from an async CLI method!
1134+
1135+
1136+
# If an event loop is already running, a new thread will be used;
1137+
# otherwise, asyncio.run() is used to execute this async method.
1138+
assert CliApp.run(AsyncSettings, cli_args=[]).model_dump() == {}
1139+
```
1140+
1141+
#### Asynchronous Subcommands
1142+
1143+
As mentioned above, you can also define subcommands as async. However, only do so for the leaf (lowest-level) subcommand to avoid spawning new threads and event loops unnecessarily in parent commands:
1144+
1145+
```py
1146+
from pydantic import BaseModel
1147+
1148+
from pydantic_settings import (
1149+
BaseSettings,
1150+
CliApp,
1151+
CliPositionalArg,
1152+
CliSubCommand,
1153+
)
1154+
1155+
1156+
class Clone(BaseModel):
1157+
repository: CliPositionalArg[str]
1158+
directory: CliPositionalArg[str]
1159+
1160+
async def cli_cmd(self) -> None:
1161+
# Perform async tasks here, e.g. network or I/O operations
1162+
print(f'Cloning async from "{self.repository}" into "{self.directory}"')
1163+
#> Cloning async from "repo" into "dir"
1164+
1165+
1166+
class Git(BaseSettings):
1167+
clone: CliSubCommand[Clone]
1168+
1169+
def cli_cmd(self) -> None:
1170+
# Run the final subcommand (clone/init). It is recommended to define async methods only at the deepest level.
1171+
CliApp.run_subcommand(self)
1172+
1173+
1174+
CliApp.run(Git, cli_args=['clone', 'repo', 'dir']).model_dump() == {
1175+
'repository': 'repo',
1176+
'directory': 'dir',
1177+
}
1178+
```
1179+
1180+
When executing a subcommand with an asynchronous cli_cmd, Pydantic settings automatically detects whether the current thread already has an active event loop. If so, the async command is run in a fresh thread to avoid conflicts. Otherwise, it uses asyncio.run() in the current thread. This handling ensures your asynchronous subcommands “just work” without additional manual setup.
1181+
11161182
### Mutually Exclusive Groups
11171183

11181184
CLI mutually exclusive groups can be created by inheriting from the `CliMutuallyExclusiveGroup` class.

pydantic_settings/main.py

+45-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from __future__ import annotations as _annotations
22

3+
import asyncio
4+
import inspect
5+
import threading
36
from argparse import Namespace
47
from types import SimpleNamespace
58
from typing import Any, ClassVar, TypeVar
@@ -459,10 +462,48 @@ class CliApp:
459462

460463
@staticmethod
461464
def _run_cli_cmd(model: Any, cli_cmd_method_name: str, is_required: bool) -> Any:
462-
if hasattr(type(model), cli_cmd_method_name):
463-
getattr(type(model), cli_cmd_method_name)(model)
464-
elif is_required:
465-
raise SettingsError(f'Error: {type(model).__name__} class is missing {cli_cmd_method_name} entrypoint')
465+
command = getattr(type(model), cli_cmd_method_name, None)
466+
if command is None:
467+
if is_required:
468+
raise SettingsError(f'Error: {type(model).__name__} class is missing {cli_cmd_method_name} entrypoint')
469+
return model
470+
471+
# If the method is asynchronous, we handle its execution based on the current event loop status.
472+
if inspect.iscoroutinefunction(command):
473+
# For asynchronous methods, we have two execution scenarios:
474+
# 1. If no event loop is running in the current thread, run the coroutine directly with asyncio.run().
475+
# 2. If an event loop is already running in the current thread, run the coroutine in a separate thread to avoid conflicts.
476+
try:
477+
# Check if an event loop is currently running in this thread.
478+
loop = asyncio.get_running_loop()
479+
except RuntimeError:
480+
loop = None
481+
482+
if loop and loop.is_running():
483+
# We're in a context with an active event loop (e.g., Jupyter Notebook).
484+
# Running asyncio.run() here would cause conflicts, so we use a separate thread.
485+
exception_container = []
486+
487+
def run_coro() -> None:
488+
try:
489+
# Execute the coroutine in a new event loop in this separate thread.
490+
asyncio.run(command(model))
491+
except Exception as e:
492+
exception_container.append(e)
493+
494+
thread = threading.Thread(target=run_coro)
495+
thread.start()
496+
thread.join()
497+
if exception_container:
498+
# Propagate exceptions from the separate thread.
499+
raise exception_container[0]
500+
else:
501+
# No event loop is running; safe to run the coroutine directly.
502+
asyncio.run(command(model))
503+
else:
504+
# For synchronous methods, call them directly.
505+
command(model)
506+
466507
return model
467508

468509
@staticmethod

tests/test_source_cli.py

+24
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import argparse
2+
import asyncio
23
import re
34
import sys
45
import time
@@ -2120,6 +2121,29 @@ def alt_cmd(self) -> None:
21202121
}
21212122

21222123

2124+
def test_cli_app_async_method_no_existing_loop():
2125+
class Command(BaseSettings):
2126+
called: bool = False
2127+
2128+
async def cli_cmd(self) -> None:
2129+
self.called = True
2130+
2131+
assert CliApp.run(Command, cli_args=[]).called
2132+
2133+
2134+
def test_cli_app_async_method_with_existing_loop():
2135+
class Command(BaseSettings):
2136+
called: bool = False
2137+
2138+
async def cli_cmd(self) -> None:
2139+
self.called = True
2140+
2141+
async def run_as_coro():
2142+
return CliApp.run(Command, cli_args=[])
2143+
2144+
assert asyncio.run(run_as_coro()).called
2145+
2146+
21232147
def test_cli_app_exceptions():
21242148
with pytest.raises(
21252149
SettingsError, match='Error: NotPydanticModel is not subclass of BaseModel or pydantic.dataclasses.dataclass'

0 commit comments

Comments
 (0)