4
4
# license information.
5
5
# --------------------------------------------------------------------------
6
6
7
- from typing import (
8
- Dict , List , Optional , Tuple , Union ,
9
- TYPE_CHECKING
10
- )
7
+ from typing import Any , Dict , List , Optional , Tuple , TYPE_CHECKING
11
8
from urllib .parse import unquote
12
9
from xml .etree .ElementTree import Element
13
10
25
22
ObjectReplicationPolicy ,
26
23
ObjectReplicationRule ,
27
24
RetentionPolicy ,
28
- StaticWebsite ,
25
+ StaticWebsite
29
26
)
30
27
from ._shared .models import get_enum_value
31
28
from ._shared .response_handlers import deserialize_metadata
32
29
33
30
if TYPE_CHECKING :
34
- from ._generated .models import BlobTag , PageList
35
-
31
+ from azure .core .pipeline import PipelineResponse
32
+ from ._generated .models import (
33
+ BlobItemInternal ,
34
+ BlobTags ,
35
+ PageList ,
36
+ StorageServiceProperties ,
37
+ StorageServiceStats ,
38
+ )
39
+ from ._shared .models import LocationMode
36
40
37
- def deserialize_pipeline_response_into_cls (cls_method , response , obj , headers ):
41
+ def deserialize_pipeline_response_into_cls (cls_method , response : "PipelineResponse" , obj : Any , headers : Dict [ str , Any ] ):
38
42
try :
39
43
deserialized_response = response .http_response
40
44
except AttributeError :
41
45
deserialized_response = response
42
46
return cls_method (deserialized_response , obj , headers )
43
47
44
48
45
- def deserialize_blob_properties (response , obj , headers ) :
49
+ def deserialize_blob_properties (response : "PipelineResponse" , obj : Any , headers : Dict [ str , Any ]) -> BlobProperties :
46
50
blob_properties = BlobProperties (
47
51
metadata = deserialize_metadata (response , obj , headers ),
48
52
object_replication_source_properties = deserialize_ors_policies (response .http_response .headers ),
@@ -56,7 +60,7 @@ def deserialize_blob_properties(response, obj, headers):
56
60
return blob_properties
57
61
58
62
59
- def deserialize_ors_policies (policy_dictionary ) :
63
+ def deserialize_ors_policies (policy_dictionary : Optional [ Dict [ str , str ]]) -> Optional [ List [ ObjectReplicationPolicy ]] :
60
64
61
65
if policy_dictionary is None :
62
66
return None
@@ -66,7 +70,7 @@ def deserialize_ors_policies(policy_dictionary):
66
70
or_policy_status_headers = {key : val for key , val in policy_dictionary .items ()
67
71
if 'or-' in key and key != 'x-ms-or-policy-id' }
68
72
69
- parsed_result = {}
73
+ parsed_result : Dict [ str , List [ ObjectReplicationRule ]] = {}
70
74
71
75
for key , val in or_policy_status_headers .items ():
72
76
# list blobs gives or-policy_rule and get blob properties gives x-ms-or-policy_rule
@@ -83,13 +87,21 @@ def deserialize_ors_policies(policy_dictionary):
83
87
return result_list
84
88
85
89
86
- def deserialize_blob_stream (response , obj , headers ):
90
+ def deserialize_blob_stream (
91
+ response : "PipelineResponse" ,
92
+ obj : Any ,
93
+ headers : Dict [str , Any ]
94
+ ) -> Tuple ["LocationMode" , Any ]:
87
95
blob_properties = deserialize_blob_properties (response , obj , headers )
88
96
obj .properties = blob_properties
89
97
return response .http_response .location_mode , obj
90
98
91
99
92
- def deserialize_container_properties (response , obj , headers ):
100
+ def deserialize_container_properties (
101
+ response : "PipelineResponse" ,
102
+ obj : Any ,
103
+ headers : Dict [str , Any ]
104
+ ) -> ContainerProperties :
93
105
metadata = deserialize_metadata (response , obj , headers )
94
106
container_properties = ContainerProperties (
95
107
metadata = metadata ,
@@ -98,45 +110,50 @@ def deserialize_container_properties(response, obj, headers):
98
110
return container_properties
99
111
100
112
101
- def get_page_ranges_result (ranges ):
102
- # type: (PageList) -> Tuple[List[Dict[str, int]], List[Dict[str, int]]]
103
- page_range = [] # type: ignore
104
- clear_range = [] # type: List
113
+ def get_page_ranges_result (ranges : "PageList" ) -> Tuple [List [Dict [str , int ]], List [Dict [str , int ]]]:
114
+ page_range = []
115
+ clear_range : List = []
105
116
if ranges .page_range :
106
- page_range = [{'start' : b .start , 'end' : b .end } for b in ranges .page_range ] # type: ignore
117
+ page_range = [{'start' : b .start , 'end' : b .end } for b in ranges .page_range ]
107
118
if ranges .clear_range :
108
119
clear_range = [{'start' : b .start , 'end' : b .end } for b in ranges .clear_range ]
109
- return page_range , clear_range # type: ignore
120
+ return page_range , clear_range
110
121
111
122
112
- # Deserialize a ServiceStats objects into a dict.
113
- def service_stats_deserialize (generated ):
123
+ def service_stats_deserialize (generated : "StorageServiceStats" ) -> Dict [str , Any ]:
124
+ status = None
125
+ last_sync_time = None
126
+ if generated .geo_replication is not None :
127
+ status = generated .geo_replication .status
128
+ last_sync_time = generated .geo_replication .last_sync_time
114
129
return {
115
130
'geo_replication' : {
116
- 'status' : generated . geo_replication . status ,
117
- 'last_sync_time' : generated . geo_replication . last_sync_time ,
131
+ 'status' : status ,
132
+ 'last_sync_time' : last_sync_time
118
133
}
119
134
}
120
135
121
- # Deserialize a ServiceProperties objects into a dict.
122
- def service_properties_deserialize (generated ):
136
+ def service_properties_deserialize (generated : "StorageServiceProperties" ) -> Dict [str , Any ]:
137
+ cors_list = None
138
+ if generated .cors is not None :
139
+ cors_list = [CorsRule ._from_generated (cors ) for cors in generated .cors ] # pylint: disable=protected-access
123
140
return {
124
141
'analytics_logging' : BlobAnalyticsLogging ._from_generated (generated .logging ), # pylint: disable=protected-access
125
142
'hour_metrics' : Metrics ._from_generated (generated .hour_metrics ), # pylint: disable=protected-access
126
143
'minute_metrics' : Metrics ._from_generated (generated .minute_metrics ), # pylint: disable=protected-access
127
- 'cors' : [ CorsRule . _from_generated ( cors ) for cors in generated . cors ], # pylint: disable=protected-access
144
+ 'cors' : cors_list ,
128
145
'target_version' : generated .default_service_version , # pylint: disable=protected-access
129
146
'delete_retention_policy' : RetentionPolicy ._from_generated (generated .delete_retention_policy ), # pylint: disable=protected-access
130
147
'static_website' : StaticWebsite ._from_generated (generated .static_website ), # pylint: disable=protected-access
131
148
}
132
149
133
150
134
- def get_blob_properties_from_generated_code (generated ) :
151
+ def get_blob_properties_from_generated_code (generated : "BlobItemInternal" ) -> BlobProperties :
135
152
blob = BlobProperties ()
136
- if generated .name .encoded :
153
+ if generated .name .encoded and generated . name . content is not None :
137
154
blob .name = unquote (generated .name .content )
138
155
else :
139
- blob .name = generated .name .content
156
+ blob .name = generated .name .content #type: ignore
140
157
blob_type = get_enum_value (generated .properties .blob_type )
141
158
blob .blob_type = BlobType (blob_type ) if blob_type else None
142
159
blob .etag = generated .properties .etag
@@ -172,38 +189,45 @@ def get_blob_properties_from_generated_code(generated):
172
189
blob .has_versions_only = generated .has_versions_only
173
190
return blob
174
191
175
- def parse_tags (generated_tags : Optional [List [ "BlobTag" ]] ) -> Union [Dict [str , str ], None ]:
192
+ def parse_tags (generated_tags : Optional ["BlobTags" ] ) -> Optional [Dict [str , str ]]:
176
193
"""Deserialize a list of BlobTag objects into a dict.
177
194
178
- :param Optional[List[BlobTag] ] generated_tags:
195
+ :param Optional[BlobTags ] generated_tags:
179
196
A list containing the BlobTag objects from generated code.
180
197
:returns: A dictionary of the BlobTag objects.
181
- :rtype: Dict[str, str] or None
198
+ :rtype: Optional[ Dict[str, str]]
182
199
"""
183
200
if generated_tags :
184
201
tag_dict = {t .key : t .value for t in generated_tags .blob_tag_set }
185
202
return tag_dict
186
203
return None
187
204
188
205
189
- def load_single_xml_node (element : Element , name : str ) -> Union [Element , None ]:
206
+ def load_single_xml_node (element : Element , name : str ) -> Optional [Element ]:
190
207
return element .find (name )
191
208
192
209
193
- def load_many_xml_nodes (element : Element , name : str , wrapper : Element = None ) -> List [Union [Element , None ]]:
210
+ def load_many_xml_nodes (
211
+ element : Element ,
212
+ name : str ,
213
+ wrapper : Optional [str ] = None
214
+ ) -> List [Optional [Element ]]:
215
+ found_element : Optional [Element ] = element
194
216
if wrapper :
195
- element = load_single_xml_node (element , wrapper )
196
- return list (element .findall (name ))
217
+ found_element = load_single_xml_node (element , wrapper )
218
+ if found_element is None :
219
+ return []
220
+ return list (found_element .findall (name ))
197
221
198
222
199
- def load_xml_string (element : Element , name : str ) -> str :
223
+ def load_xml_string (element : Element , name : str ) -> Optional [ str ] :
200
224
node = element .find (name )
201
225
if node is None or not node .text :
202
226
return None
203
227
return node .text
204
228
205
229
206
- def load_xml_int (element : Element , name : str ) -> int :
230
+ def load_xml_int (element : Element , name : str ) -> Optional [ int ] :
207
231
node = element .find (name )
208
232
if node is None or not node .text :
209
233
return None
0 commit comments