14
14
15
15
"""Internal HTTP client module.
16
16
17
- This module provides utilities for making HTTP calls using the requests library.
18
- """
19
-
20
- from google .auth import transport
21
- import requests
17
+ This module provides utilities for making HTTP calls using the requests library.
18
+ """
19
+
20
+ from __future__ import annotations
21
+ import logging
22
+ from typing import Any , Dict , Generator , Optional , Tuple , Union
23
+ import httpx
24
+ import requests .adapters
22
25
from requests .packages .urllib3 .util import retry # pylint: disable=import-error
26
+ from google .auth import credentials
27
+ from google .auth import transport
28
+ from google .auth .transport import requests as google_auth_requests
23
29
24
30
from firebase_admin import _utils
31
+ from firebase_admin ._retry import HttpxRetry , HttpxRetryTransport
32
+
33
+ logger = logging .getLogger (__name__ )
25
34
26
35
if hasattr (retry .Retry .DEFAULT , 'allowed_methods' ):
27
36
_ANY_METHOD = {'allowed_methods' : None }
34
43
connect = 1 , read = 1 , status = 4 , status_forcelist = [500 , 503 ],
35
44
raise_on_status = False , backoff_factor = 0.5 , ** _ANY_METHOD )
36
45
46
+ DEFAULT_HTTPX_RETRY_CONFIG = HttpxRetry (
47
+ max_retries = 4 , status_forcelist = [500 , 503 ], backoff_factor = 0.5 )
48
+
37
49
38
50
DEFAULT_TIMEOUT_SECONDS = 120
39
51
@@ -144,7 +156,6 @@ def close(self):
144
156
self ._session .close ()
145
157
self ._session = None
146
158
147
-
148
159
class JsonHttpClient (HttpClient ):
149
160
"""An HTTP client that parses response messages as JSON."""
150
161
@@ -153,3 +164,194 @@ def __init__(self, **kwargs):
153
164
154
165
def parse_body (self , resp ):
155
166
return resp .json ()
167
+
168
+ class GoogleAuthCredentialFlow (httpx .Auth ):
169
+ """Google Auth Credential Auth Flow"""
170
+ def __init__ (self , credential : credentials .Credentials ):
171
+ self ._credential = credential
172
+ self ._max_refresh_attempts = 2
173
+ self ._refresh_status_codes = (401 ,)
174
+
175
+ def apply_auth_headers (
176
+ self ,
177
+ request : httpx .Request ,
178
+ auth_request : google_auth_requests .Request
179
+ ) -> None :
180
+ """A helper function that refreshes credentials if needed and mutates the request headers
181
+ to contain access token and any other Google Auth headers."""
182
+
183
+ logger .debug (
184
+ 'Attempting to apply auth headers. Credential validity before: %s' ,
185
+ self ._credential .valid
186
+ )
187
+ self ._credential .before_request (
188
+ auth_request , request .method , str (request .url ), request .headers
189
+ )
190
+ logger .debug ('Auth headers applied. Credential validity after: %s' , self ._credential .valid )
191
+
192
+ def auth_flow (self , request : httpx .Request ) -> Generator [httpx .Request , httpx .Response , None ]:
193
+ _original_headers = request .headers .copy ()
194
+ _credential_refresh_attempt = 0
195
+
196
+ # Create a Google auth request object to be used for refreshing credentials
197
+ auth_request = google_auth_requests .Request ()
198
+
199
+ while True :
200
+ # Copy original headers for each attempt
201
+ request .headers = _original_headers .copy ()
202
+
203
+ # Apply auth headers (which might include an implicit refresh if token is expired)
204
+ self .apply_auth_headers (request , auth_request )
205
+
206
+ logger .debug (
207
+ 'Dispatching request, attempt %d of %d' ,
208
+ _credential_refresh_attempt , self ._max_refresh_attempts
209
+ )
210
+ response : httpx .Response = yield request
211
+
212
+ if response .status_code in self ._refresh_status_codes :
213
+ if _credential_refresh_attempt < self ._max_refresh_attempts :
214
+ logger .debug (
215
+ 'Received status %d. Attempting explicit credential refresh. \
216
+ Attempt %d of %d.' ,
217
+ response .status_code ,
218
+ _credential_refresh_attempt + 1 ,
219
+ self ._max_refresh_attempts
220
+ )
221
+ # Explicitly force a credentials refresh
222
+ self ._credential .refresh (auth_request )
223
+ _credential_refresh_attempt += 1
224
+ else :
225
+ logger .debug (
226
+ 'Received status %d, but max auth refresh attempts (%d) reached. \
227
+ Returning last response.' ,
228
+ response .status_code , self ._max_refresh_attempts
229
+ )
230
+ break
231
+ else :
232
+ # Status code is not one that requires a refresh, so break and return response
233
+ logger .debug (
234
+ 'Status code %d does not require refresh. Returning response.' ,
235
+ response .status_code
236
+ )
237
+ break
238
+ # The last yielded response is automatically returned by httpx's auth flow.
239
+
240
+ class HttpxAsyncClient ():
241
+ """Async HTTP client used to make HTTP/2 calls using HTTPX.
242
+
243
+ HttpxAsyncClient maintains an async HTTPX client, handles request authentication, and retries
244
+ if necessary.
245
+ """
246
+ def __init__ (
247
+ self ,
248
+ credential : Optional [credentials .Credentials ] = None ,
249
+ base_url : str = '' ,
250
+ headers : Optional [Union [httpx .Headers , Dict [str , str ]]] = None ,
251
+ retry_config : HttpxRetry = DEFAULT_HTTPX_RETRY_CONFIG ,
252
+ timeout : int = DEFAULT_TIMEOUT_SECONDS ,
253
+ http2 : bool = True
254
+ ) -> None :
255
+ """Creates a new HttpxAsyncClient instance from the provided arguments.
256
+
257
+ If a credential is provided, initializes a new async HTTPX client authorized with it.
258
+ Otherwise, initializes a new unauthorized async HTTPX client.
259
+
260
+ Args:
261
+ credential: A Google credential that can be used to authenticate requests (optional).
262
+ base_url: A URL prefix to be added to all outgoing requests (optional).
263
+ headers: A map of headers to be added to all outgoing requests (optional).
264
+ retry_config: A HttpxRetry configuration. Default settings would retry up to 4 times for
265
+ HTTP 500 and 503 errors (optional).
266
+ timeout: HTTP timeout in seconds. Defaults to 120 seconds when not specified (optional).
267
+ http2: A boolean indicating if HTTP/2 support should be enabled. Defaults to `True` when
268
+ not specified (optional).
269
+ """
270
+ self ._base_url = base_url
271
+ self ._timeout = timeout
272
+ self ._headers = {** headers , ** METRICS_HEADERS } if headers else {** METRICS_HEADERS }
273
+ self ._retry_config = retry_config
274
+
275
+ # Only set up retries on urls starting with 'http://' and 'https://'
276
+ self ._mounts = {
277
+ 'http://' : HttpxRetryTransport (retry = self ._retry_config , http2 = http2 ),
278
+ 'https://' : HttpxRetryTransport (retry = self ._retry_config , http2 = http2 )
279
+ }
280
+
281
+ if credential :
282
+ self ._async_client = httpx .AsyncClient (
283
+ http2 = http2 ,
284
+ timeout = self ._timeout ,
285
+ headers = self ._headers ,
286
+ auth = GoogleAuthCredentialFlow (credential ), # Add auth flow for credentials.
287
+ mounts = self ._mounts
288
+ )
289
+ else :
290
+ self ._async_client = httpx .AsyncClient (
291
+ http2 = http2 ,
292
+ timeout = self ._timeout ,
293
+ headers = self ._headers ,
294
+ mounts = self ._mounts
295
+ )
296
+
297
+ @property
298
+ def base_url (self ):
299
+ return self ._base_url
300
+
301
+ @property
302
+ def timeout (self ):
303
+ return self ._timeout
304
+
305
+ @property
306
+ def async_client (self ):
307
+ return self ._async_client
308
+
309
+ async def request (self , method : str , url : str , ** kwargs : Any ) -> httpx .Response :
310
+ """Makes an HTTP call using the HTTPX library.
311
+
312
+ This is the sole entry point to the HTTPX library. All other helper methods in this
313
+ class call this method to send HTTP requests out. Refer to
314
+ https://www.python-httpx.org/api/ for more information on supported options
315
+ and features.
316
+
317
+ Args:
318
+ method: HTTP method name as a string (e.g. get, post).
319
+ url: URL of the remote endpoint.
320
+ **kwargs: An additional set of keyword arguments to be passed into the HTTPX API
321
+ (e.g. json, params, timeout).
322
+
323
+ Returns:
324
+ Response: An HTTPX response object.
325
+
326
+ Raises:
327
+ HTTPError: Any HTTPX exceptions encountered while making the HTTP call.
328
+ RequestException: Any requests exceptions encountered while making the HTTP call.
329
+ """
330
+ if 'timeout' not in kwargs :
331
+ kwargs ['timeout' ] = self .timeout
332
+ resp = await self ._async_client .request (method , self .base_url + url , ** kwargs )
333
+ return resp .raise_for_status ()
334
+
335
+ async def headers (self , method : str , url : str , ** kwargs : Any ) -> httpx .Headers :
336
+ resp = await self .request (method , url , ** kwargs )
337
+ return resp .headers
338
+
339
+ async def body_and_response (
340
+ self , method : str , url : str , ** kwargs : Any ) -> Tuple [Any , httpx .Response ]:
341
+ resp = await self .request (method , url , ** kwargs )
342
+ return self .parse_body (resp ), resp
343
+
344
+ async def body (self , method : str , url : str , ** kwargs : Any ) -> Any :
345
+ resp = await self .request (method , url , ** kwargs )
346
+ return self .parse_body (resp )
347
+
348
+ async def headers_and_body (
349
+ self , method : str , url : str , ** kwargs : Any ) -> Tuple [httpx .Headers , Any ]:
350
+ resp = await self .request (method , url , ** kwargs )
351
+ return resp .headers , self .parse_body (resp )
352
+
353
+ def parse_body (self , resp : httpx .Response ) -> Any :
354
+ return resp .json ()
355
+
356
+ async def aclose (self ) -> None :
357
+ await self ._async_client .aclose ()
0 commit comments