1
1
import sys
2
2
import warnings
3
+ from functools import partial
3
4
from typing import Any
5
+ from typing import Callable
4
6
from typing import Dict
5
7
from typing import Iterable
6
8
from typing import Optional
11
13
from functools import cached_property
12
14
else :
13
15
from backports .cached_property import cached_property
16
+ from jsonschema ._format import FormatChecker
14
17
from jsonschema .protocols import Validator
15
18
from openapi_schema_validator import OAS30Validator
16
19
17
20
from openapi_core .spec import Spec
18
21
from openapi_core .unmarshalling .schemas .datatypes import CustomFormattersDict
19
- from openapi_core .unmarshalling .schemas .datatypes import FormattersDict
22
+ from openapi_core .unmarshalling .schemas .datatypes import FormatUnmarshaller
23
+ from openapi_core .unmarshalling .schemas .datatypes import FormatValidator
24
+ from openapi_core .unmarshalling .schemas .datatypes import UnmarshallersDict
20
25
from openapi_core .unmarshalling .schemas .enums import ValidationContext
21
26
from openapi_core .unmarshalling .schemas .exceptions import (
22
27
FormatterNotFoundError ,
47
52
48
53
49
54
class SchemaValidatorsFactory :
50
- CONTEXTS = {
51
- ValidationContext .REQUEST : "write" ,
52
- ValidationContext .RESPONSE : "read" ,
53
- }
54
-
55
55
def __init__ (
56
56
self ,
57
57
schema_validator_class : Type [Validator ],
58
+ base_format_checker : Optional [FormatChecker ] = None ,
59
+ formatters : Optional [CustomFormattersDict ] = None ,
60
+ format_unmarshallers : Optional [UnmarshallersDict ] = None ,
58
61
custom_formatters : Optional [CustomFormattersDict ] = None ,
59
- context : Optional [ValidationContext ] = None ,
60
62
):
61
63
self .schema_validator_class = schema_validator_class
64
+ if base_format_checker is None :
65
+ base_format_checker = self .schema_validator_class .FORMAT_CHECKER
66
+ self .base_format_checker = base_format_checker
67
+ if formatters is None :
68
+ formatters = {}
69
+ self .formatters = formatters
70
+ if format_unmarshallers is None :
71
+ format_unmarshallers = {}
72
+ self .format_unmarshallers = format_unmarshallers
62
73
if custom_formatters is None :
63
74
custom_formatters = {}
64
75
self .custom_formatters = custom_formatters
65
- self .context = context
66
76
67
- def create ( self , schema : Spec ) -> Validator :
68
- resolver = schema . accessor . resolver # type: ignore
69
- custom_format_checks = {
77
+ @ cached_property
78
+ def format_checker ( self ) -> FormatChecker :
79
+ format_checks : Dict [ str , Callable [[ Any ], bool ]] = {
70
80
name : formatter .validate
71
- for name , formatter in self .custom_formatters .items ()
72
- }
73
- format_checker = build_format_checker (** custom_format_checks )
74
- kwargs = {
75
- "resolver" : resolver ,
76
- "format_checker" : format_checker ,
81
+ for formatters_list in [self .formatters , self .custom_formatters ]
82
+ for name , formatter in formatters_list .items ()
77
83
}
78
- if self .context is not None :
79
- kwargs [self .CONTEXTS [self .context ]] = True
84
+ format_checks .update (
85
+ {
86
+ name : self ._create_checker (name )
87
+ for name , _ in self .format_unmarshallers .items ()
88
+ }
89
+ )
90
+ return build_format_checker (self .base_format_checker , ** format_checks )
91
+
92
+ def _create_checker (self , name : str ) -> FormatValidator :
93
+ if name in self .base_format_checker .checkers :
94
+ return partial (self .base_format_checker .check , format = name )
95
+
96
+ return lambda x : True
97
+
98
+ def get_checker (self , name : str ) -> FormatValidator :
99
+ if name in self .format_checker .checkers :
100
+ return partial (self .format_checker .check , format = name )
101
+
102
+ return lambda x : True
103
+
104
+ def create (self , schema : Spec ) -> Validator :
105
+ resolver = schema .accessor .resolver # type: ignore
80
106
with schema .open () as schema_dict :
81
- return self .schema_validator_class (schema_dict , ** kwargs )
107
+ return self .schema_validator_class (
108
+ schema_dict ,
109
+ resolver = resolver ,
110
+ format_checker = self .format_checker ,
111
+ )
112
+
113
+
114
+ class SchemaFormatUnmarshallersFactory :
115
+ def __init__ (
116
+ self ,
117
+ validators_factory : SchemaValidatorsFactory ,
118
+ formatters : Optional [CustomFormattersDict ] = None ,
119
+ format_unmarshallers : Optional [UnmarshallersDict ] = None ,
120
+ custom_formatters : Optional [CustomFormattersDict ] = None ,
121
+ ):
122
+ self .validators_factory = validators_factory
123
+ if formatters is None :
124
+ formatters = {}
125
+ self .formatters = formatters
126
+ if format_unmarshallers is None :
127
+ format_unmarshallers = {}
128
+ self .format_unmarshallers = format_unmarshallers
129
+ if custom_formatters is None :
130
+ custom_formatters = {}
131
+ self .custom_formatters = custom_formatters
132
+
133
+ def create (self , schema_format : str ) -> Optional [FormatUnmarshaller ]:
134
+ if schema_format in self .custom_formatters :
135
+ formatter = self .custom_formatters [schema_format ]
136
+ return formatter .format
137
+ if schema_format in self .formatters :
138
+ formatter = self .formatters [schema_format ]
139
+ return formatter .format
140
+ if schema_format in self .format_unmarshallers :
141
+ return self .format_unmarshallers [schema_format ]
142
+
143
+ return None
82
144
83
145
84
146
class SchemaUnmarshallersFactory :
@@ -102,23 +164,59 @@ class SchemaUnmarshallersFactory:
102
164
def __init__ (
103
165
self ,
104
166
schema_validator_class : Type [Validator ],
167
+ base_format_checker : Optional [FormatChecker ] = None ,
168
+ formatters : Optional [CustomFormattersDict ] = None ,
169
+ format_unmarshallers : Optional [UnmarshallersDict ] = None ,
105
170
custom_formatters : Optional [CustomFormattersDict ] = None ,
106
171
context : Optional [ValidationContext ] = None ,
107
172
):
108
173
self .schema_validator_class = schema_validator_class
174
+ self .base_format_checker = base_format_checker
109
175
if custom_formatters is None :
110
176
custom_formatters = {}
177
+ else :
178
+ warnings .warn (
179
+ "custom_formatters is deprecated. "
180
+ "Register new checks to FormatChecker to validate custom formats "
181
+ "and add format_unmarshallers to unmarshal custom formats." ,
182
+ DeprecationWarning ,
183
+ )
184
+ self .formatters = formatters
185
+ if format_unmarshallers is None :
186
+ format_unmarshallers = {}
187
+ self .format_unmarshallers = format_unmarshallers
111
188
self .custom_formatters = custom_formatters
112
189
self .context = context
113
190
114
191
@cached_property
115
192
def validators_factory (self ) -> SchemaValidatorsFactory :
116
193
return SchemaValidatorsFactory (
117
194
self .schema_validator_class ,
195
+ self .base_format_checker ,
196
+ self .formatters ,
197
+ self .format_unmarshallers ,
198
+ self .custom_formatters ,
199
+ )
200
+
201
+ @cached_property
202
+ def format_unmarshallers_factory (self ) -> SchemaFormatUnmarshallersFactory :
203
+ return SchemaFormatUnmarshallersFactory (
204
+ self .validators_factory ,
205
+ self .formatters ,
206
+ self .format_unmarshallers ,
118
207
self .custom_formatters ,
119
- self .context ,
120
208
)
121
209
210
+ def get_format_validator (
211
+ self , validator : Validator , schema_format : str
212
+ ) -> Optional [FormatValidator ]:
213
+ if schema_format in validator .format_checker .checkers :
214
+ return partial (
215
+ validator .format_checker .check , format = schema_format
216
+ )
217
+
218
+ return None
219
+
122
220
def create (
123
221
self , schema : Spec , type_override : Optional [str ] = None
124
222
) -> BaseSchemaUnmarshaller :
@@ -132,7 +230,19 @@ def create(
132
230
validator = self .validators_factory .create (schema )
133
231
134
232
schema_format = schema .getkey ("format" )
135
- formatter = self .custom_formatters .get (schema_format )
233
+
234
+ # FIXME: don;t raise exception on unknown format
235
+ if (
236
+ schema_format
237
+ and schema_format
238
+ not in self .validators_factory .format_checker .checkers
239
+ ):
240
+ raise FormatterNotFoundError (schema_format )
241
+
242
+ format_unmarshaller = self .format_unmarshallers_factory .create (
243
+ schema_format
244
+ )
245
+ format_validator = self .get_format_validator (validator , schema_format )
136
246
137
247
schema_type = type_override or schema .getkey ("type" , "any" )
138
248
if isinstance (schema_type , Iterable ) and not isinstance (
@@ -141,7 +251,7 @@ def create(
141
251
return MultiTypeUnmarshaller (
142
252
schema ,
143
253
validator ,
144
- formatter ,
254
+ format_unmarshaller ,
145
255
self .validators_factory ,
146
256
self ,
147
257
context = self .context ,
@@ -151,7 +261,7 @@ def create(
151
261
return complex_klass (
152
262
schema ,
153
263
validator ,
154
- formatter ,
264
+ format_unmarshaller ,
155
265
self .validators_factory ,
156
266
self ,
157
267
context = self .context ,
@@ -161,7 +271,7 @@ def create(
161
271
return klass (
162
272
schema ,
163
273
validator ,
164
- formatter ,
274
+ format_unmarshaller ,
165
275
self .validators_factory ,
166
276
self ,
167
277
)
0 commit comments