forked from ets-labs/python-dependency-injector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_cwiring.pyx
83 lines (64 loc) · 2.65 KB
/
_cwiring.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Wiring optimizations module."""
import asyncio
import collections.abc
import inspect
import types
from .wiring import _Marker
from .providers cimport Provider, Resource
def _sync_inject(object fn, tuple args, dict kwargs, dict injections, dict closings, /):
cdef object result
cdef dict to_inject
cdef object arg_key
cdef Provider provider
to_inject = kwargs.copy()
for arg_key, provider in injections.items():
if arg_key not in kwargs or isinstance(kwargs[arg_key], _Marker):
to_inject[arg_key] = provider()
result = fn(*args, **to_inject)
if closings:
for arg_key, provider in closings.items():
if arg_key in kwargs and not isinstance(kwargs[arg_key], _Marker):
continue
if not isinstance(provider, Resource):
continue
provider.shutdown()
return result
async def _async_inject(object fn, tuple args, dict kwargs, dict injections, dict closings, /):
cdef object result
cdef dict to_inject
cdef list to_inject_await = []
cdef list to_close_await = []
cdef object arg_key
cdef Provider provider
to_inject = kwargs.copy()
for arg_key, provider in injections.items():
if arg_key not in kwargs or isinstance(kwargs[arg_key], _Marker):
provide = provider()
if provider.is_async_mode_enabled():
to_inject_await.append((arg_key, provide))
elif _isawaitable(provide):
to_inject_await.append((arg_key, provide))
else:
to_inject[arg_key] = provide
if to_inject_await:
async_to_inject = await asyncio.gather(*(provide for _, provide in to_inject_await))
for provide, (injection, _) in zip(async_to_inject, to_inject_await):
to_inject[injection] = provide
result = await fn(*args, **to_inject)
if closings:
for arg_key, provider in closings.items():
if arg_key in kwargs and isinstance(kwargs[arg_key], _Marker):
continue
if not isinstance(provider, Resource):
continue
shutdown = provider.shutdown()
if _isawaitable(shutdown):
to_close_await.append(shutdown)
await asyncio.gather(*to_close_await)
return result
cdef bint _isawaitable(object instance):
"""Return true if object can be passed to an ``await`` expression."""
return (isinstance(instance, types.CoroutineType) or
isinstance(instance, types.GeneratorType) and
bool(instance.gi_code.co_flags & inspect.CO_ITERABLE_COROUTINE) or
isinstance(instance, collections.abc.Awaitable))