4
4
import json
5
5
import logging
6
6
from ssl import SSLContext
7
- from typing import Any , AsyncGenerator , Dict , Optional , Tuple , Type , Union
7
+ from typing import Any , AsyncGenerator , Callable , Dict , Optional , Tuple , Type , Union
8
8
9
9
import aiohttp
10
10
from aiohttp .client_exceptions import ClientResponseError
@@ -49,6 +49,7 @@ def __init__(
49
49
ssl : Union [SSLContext , bool , Fingerprint ] = False ,
50
50
timeout : Optional [int ] = None ,
51
51
ssl_close_timeout : Optional [Union [int , float ]] = 10 ,
52
+ json_serialize : Callable = json .dumps ,
52
53
client_session_args : Optional [Dict [str , Any ]] = None ,
53
54
) -> None :
54
55
"""Initialize the transport with the given aiohttp parameters.
@@ -61,6 +62,8 @@ def __init__(
61
62
:param ssl: ssl_context of the connection. Use ssl=False to disable encryption
62
63
:param ssl_close_timeout: Timeout in seconds to wait for the ssl connection
63
64
to close properly
65
+ :param json_serialize: Json serializer callable.
66
+ By default json.dumps() function
64
67
:param client_session_args: Dict of extra args passed to
65
68
`aiohttp.ClientSession`_
66
69
@@ -77,6 +80,7 @@ def __init__(
77
80
self .client_session_args = client_session_args
78
81
self .session : Optional [aiohttp .ClientSession ] = None
79
82
self .response_headers : Optional [CIMultiDictProxy [str ]]
83
+ self .json_serialize : Callable = json_serialize
80
84
81
85
async def connect (self ) -> None :
82
86
"""Coroutine which will create an aiohttp ClientSession() as self.session.
@@ -96,6 +100,7 @@ async def connect(self) -> None:
96
100
"auth" : None
97
101
if isinstance (self .auth , AppSyncAuthentication )
98
102
else self .auth ,
103
+ "json_serialize" : self .json_serialize ,
99
104
}
100
105
101
106
if self .timeout is not None :
@@ -248,14 +253,14 @@ async def execute(
248
253
file_streams = {str (i ): files [path ] for i , path in enumerate (files )}
249
254
250
255
# Add the payload to the operations field
251
- operations_str = json . dumps (payload )
256
+ operations_str = self . json_serialize (payload )
252
257
log .debug ("operations %s" , operations_str )
253
258
data .add_field (
254
259
"operations" , operations_str , content_type = "application/json"
255
260
)
256
261
257
262
# Add the file map field
258
- file_map_str = json . dumps (file_map )
263
+ file_map_str = self . json_serialize (file_map )
259
264
log .debug ("file_map %s" , file_map_str )
260
265
data .add_field ("map" , file_map_str , content_type = "application/json" )
261
266
@@ -270,7 +275,7 @@ async def execute(
270
275
payload ["variables" ] = variable_values
271
276
272
277
if log .isEnabledFor (logging .INFO ):
273
- log .info (">>> %s" , json . dumps (payload ))
278
+ log .info (">>> %s" , self . json_serialize (payload ))
274
279
275
280
post_args = {"json" : payload }
276
281
@@ -281,7 +286,7 @@ async def execute(
281
286
# Add headers for AppSync if requested
282
287
if isinstance (self .auth , AppSyncAuthentication ):
283
288
post_args ["headers" ] = self .auth .get_headers (
284
- json . dumps (payload ),
289
+ self . json_serialize (payload ),
285
290
{"content-type" : "application/json" },
286
291
)
287
292
0 commit comments