-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathpy3support.py
57 lines (51 loc) · 2.12 KB
/
py3support.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from __future__ import print_function
import sys
from inspect import isgenerator
from inspect import isgeneratorfunction
from logging import getLogger
from aspectlib import ExpectedGenerator
from aspectlib import mimic
from aspectlib import Proceed
from aspectlib import Return
from aspectlib import UnacceptableAdvice
from aspectlib.utils import logf
logger = getLogger(__name__)
logdebug = logf(logger.debug)
def decorate_advising_generator_py3(advising_function, cutpoint_function, bind):
assert isgeneratorfunction(cutpoint_function)
def advising_generator_wrapper_py3(*args, **kwargs):
if bind:
advisor = advising_function(cutpoint_function, *args, **kwargs)
else:
advisor = advising_function(*args, **kwargs)
if not isgenerator(advisor):
raise ExpectedGenerator("advising_function %s did not return a generator." % advising_function)
try:
advice = next(advisor)
while True:
logdebug('Got advice %r from %s', advice, advising_function)
if advice is Proceed or advice is None or isinstance(advice, Proceed):
if isinstance(advice, Proceed):
args = advice.args
kwargs = advice.kwargs
gen = cutpoint_function(*args, **kwargs)
try:
result = yield from gen
except BaseException:
advice = advisor.throw(*sys.exc_info())
else:
try:
advice = advisor.send(result)
except StopIteration:
return
finally:
gen.close()
elif advice is Return:
return
elif isinstance(advice, Return):
raise StopIteration(advice.value)
else:
raise UnacceptableAdvice("Unknown advice %s" % advice)
finally:
advisor.close()
return mimic(advising_generator_wrapper_py3, cutpoint_function)