Skip to content

Core decompress body #18581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
65 changes: 59 additions & 6 deletions sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@
# --------------------------------------------------------------------------
from typing import Any, Optional, AsyncIterator as AsyncIteratorType
from collections.abc import AsyncIterator
try:
import cchardet as chardet
except ImportError: # pragma: no cover
import chardet # type: ignore

import logging
import asyncio
import codecs
import aiohttp
from multidict import CIMultiDict
from requests.exceptions import StreamConsumedError
Expand All @@ -51,7 +56,7 @@ class AioHttpTransport(AsyncHttpTransport):

Fully asynchronous implementation using the aiohttp library.

:param session: The client session.
:param aiohttp.ClientSession session: The client session.
:param loop: The event loop.
:param bool session_owner: Session owner. Defaults True.

Expand All @@ -69,7 +74,7 @@ class AioHttpTransport(AsyncHttpTransport):
def __init__(self, *, session=None, loop=None, session_owner=True, **kwargs):
self._loop = loop
self._session_owner = session_owner
self.session = session
self.session = session # type: aiohttp.ClientSession
self.connection_config = ConnectionConfiguration(**kwargs)
self._use_env_settings = kwargs.pop('use_env_settings', True)

Expand Down Expand Up @@ -145,6 +150,7 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR
:keyword str proxy: will define the proxy to use all the time
"""
await self.open()
auto_decompress = self.session.auto_decompress

proxies = config.pop('proxies', None)
if proxies and 'proxy' not in config:
Expand Down Expand Up @@ -180,7 +186,9 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR
allow_redirects=False,
**config
)
response = AioHttpTransportResponse(request, result, self.connection_config.data_block_size)
response = AioHttpTransportResponse(request, result,
self.connection_config.data_block_size,
decompress=not auto_decompress)
if not stream_response:
await response.load_body()
except aiohttp.client_exceptions.ClientResponseError as err:
Expand Down Expand Up @@ -250,21 +258,40 @@ class AioHttpTransportResponse(AsyncHttpResponse):
:type aiohttp_response: aiohttp.ClientResponse object
:param block_size: block size of data sent over connection.
:type block_size: int
:keyword bool decompress: If True which is default, will attempt to decode the body based
on the ‘content-encoding’ header.
"""
def __init__(self, request: HttpRequest, aiohttp_response: aiohttp.ClientResponse, block_size=None) -> None:
def __init__(self, request: HttpRequest,
aiohttp_response: aiohttp.ClientResponse,
block_size=None, **kwargs) -> None:
super(AioHttpTransportResponse, self).__init__(request, aiohttp_response, block_size=block_size)
# https://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse
self.status_code = aiohttp_response.status
self.headers = CIMultiDict(aiohttp_response.headers)
self.reason = aiohttp_response.reason
self.content_type = aiohttp_response.headers.get('content-type')
self._body = None
self._decompress = kwargs.pop("decompress", True)
if len(kwargs) > 0:
raise TypeError("Got an unexpected keyword argument: {}".format(list(kwargs.keys())[0]))

def body(self) -> bytes:
"""Return the whole body as bytes in memory.
"""
if self._body is None:
raise ValueError("Body is not available. Call async method load_body, or do your call with stream=False.")
if not self._decompress:
return self._body
enc = self.headers.get('Content-Encoding')
if not enc:
return self._body
enc = enc.lower()
if enc in ("gzip", "deflate"):
import zlib
zlib_mode = 16 + zlib.MAX_WBITS if enc == "gzip" else zlib.MAX_WBITS
decompressor = zlib.decompressobj(wbits=zlib_mode)
body = decompressor.decompress(self._body)
return body
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have some concerns about us not caching the decompressed body. Because we only need it once, right? Do we have any other access to self._body that requires us to keep the compressed data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect (as least I did not see) users need to get body twice.

If you want, we can update the code like:

    if enc in ("gzip", "deflate"):
        if self._decompressed_body:
                 return self._decompressed_body
        import zlib
        zlib_mode = 16 + zlib.MAX_WBITS if enc == "gzip" else zlib.MAX_WBITS
        decompressor = zlib.decompressobj(wbits=zlib_mode)
        self._decompressed_body = decompressor.decompress(self._body)
        return self._decompressed_body
    return self._body

But to be honest, I don't see lots of value for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The don't need to get the body more than once. And it would not be clear to me that getting the body and then the text will decompress the body twice.

I don't think we need to keep the compressed data around once it has been decompressed, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fair. Updated. :)

return self._body

def text(self, encoding: Optional[str] = None) -> str:
Expand All @@ -274,10 +301,36 @@ def text(self, encoding: Optional[str] = None) -> str:

:param str encoding: The encoding to apply.
"""
# super().text detects charset based on self._body() which is compressed
# implement the decoding explicitly here
body = self.body()

ctype = self.headers.get(aiohttp.hdrs.CONTENT_TYPE, "").lower()
mimetype = aiohttp.helpers.parse_mimetype(ctype)

encoding = mimetype.parameters.get("charset")
if encoding:
try:
codecs.lookup(encoding)
except LookupError:
encoding = None
if not encoding:
if mimetype.type == "application" and (
mimetype.subtype == "json" or mimetype.subtype == "rdap"
):
# RFC 7159 states that the default encoding is UTF-8.
# RFC 7483 defines application/rdap+json
encoding = "utf-8"
elif body is None:
raise RuntimeError(
"Cannot guess the encoding of a not yet read body"
)
else:
encoding = chardet.detect(body)["encoding"]
if not encoding:
encoding = self.internal_response.get_encoding()
encoding = "utf-8-sig"

return super().text(encoding)
return body.decode(encoding)

async def load_body(self) -> None:
"""Load in memory the body, so it could be accessible from sync methods."""
Expand Down
39 changes: 39 additions & 0 deletions sdk/core/azure-core/tests/async_tests/test_universal_http_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,45 @@ async def test_aiohttp_response_text():
)
assert res.text(encoding) == '56', "Encoding {} didn't work".format(encoding)

@pytest.mark.asyncio
async def test_aiohttp_response_decompression():
res = _create_aiohttp_response(
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x00\x8d\x8d\xb1n\xc30\x0cD"
b"\xff\x85s\x14HVlY\xda\x8av.\n4\x1d\x9a\x8d\xa1\xe5D\x80m\x01\x12="
b"\x14A\xfe\xbd\x92\x81d\xceB\x1c\xef\xf8\x8e7\x08\x038\xf0\xa67Fj+"
b"\x946\x9d8\x0c4\x08{\x96(\x94mzkh\x1cM/a\x07\x94<\xb2\x1f>\xca8\x86"
b"\xd9\xff0\x15\xb6\x91\x8d\x12\xb2\x15\xd2\x1c\x95q\xbau\xba\xdbk"
b"\xd5(\xd9\xb5\xa7\xc2L\x98\xf9\x8d8\xc4\xe5U\xccV,3\xf2\x9a\xcb\xddg"
b"\xe4o\xc6T\xdeVw\x9dgL\x7f\xe0n\xc0\x91q\x02'w0b\x98JZe^\x89|\xce\x9b"
b"\x0e\xcbW\x8a\x97\xf4X\x97\xc8\xbf\xfeYU\x1d\xc2\x85\xfc\xf4@\xb7\xbe"
b"\xf7+&$\xf6\xa9\x8a\xcb\x96\xdc\xef\xff\xaa\xa1\x1c\xf9$\x01\x00\x00",
{'Content-Type': 'text/plain', 'Content-Encoding':"gzip"}
)
body = res.body()
expect = b'{"id":"e7877039-1376-4dcd-9b0a-192897cff780","createdDateTimeUtc":' \
b'"2021-05-07T17:35:36.3121065Z","lastActionDateTimeUtc":' \
b'"2021-05-07T17:35:36.3121069Z","status":"NotStarted",' \
b'"summary":{"total":0,"failed":0,"success":0,"inProgress":0,' \
b'"notYetStarted":0,"cancelled":0,"totalCharacterCharged":0}}'
assert res.body() == expect, "Decompression didn't work"

@pytest.mark.asyncio
async def test_aiohttp_response_decompression_negtive():
import zlib
res = _create_aiohttp_response(
b"\xff\x85s\x14HVlY\xda\x8av.\n4\x1d\x9a\x8d\xa1\xe5D\x80m\x01\x12="
b"\x14A\xfe\xbd\x92\x81d\xceB\x1c\xef\xf8\x8e7\x08\x038\xf0\xa67Fj+"
b"\x946\x9d8\x0c4\x08{\x96(\x94mzkh\x1cM/a\x07\x94<\xb2\x1f>\xca8\x86"
b"\xd9\xff0\x15\xb6\x91\x8d\x12\xb2\x15\xd2\x1c\x95q\xbau\xba\xdbk"
b"\xd5(\xd9\xb5\xa7\xc2L\x98\xf9\x8d8\xc4\xe5U\xccV,3\xf2\x9a\xcb\xddg"
b"\xe4o\xc6T\xdeVw\x9dgL\x7f\xe0n\xc0\x91q\x02'w0b\x98JZe^\x89|\xce\x9b"
b"\x0e\xcbW\x8a\x97\xf4X\x97\xc8\xbf\xfeYU\x1d\xc2\x85\xfc\xf4@\xb7\xbe"
b"\xf7+&$\xf6\xa9\x8a\xcb\x96\xdc\xef\xff\xaa\xa1\x1c\xf9$\x01\x00\x00",
{'Content-Type': 'text/plain', 'Content-Encoding':"gzip"}
)
with pytest.raises(zlib.error):
body = res.body()

def test_repr():
res = _create_aiohttp_response(
b'\xef\xbb\xbf56',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ interactions:
Accept:
- application/json;odata.metadata=none
User-Agent:
- azsdk-python-search-documents/11.1.1 Python/3.8.6 (Windows-10-10.0.19041-SP0)
- azsdk-python-search-documents/11.2.0b3 Python/3.8.6 (Windows-10-10.0.19041-SP0)
method: GET
uri: https://search6dc91ab1.search.windows.net/indexes('drgqefsg')/docs/$count?api-version=2020-06-30-Preview
response:
body:
string: "\uFEFF10"
string: !!binary |
H4sIAAAAAAAEAO29B2AcSZYlJi9tynt/SvVK1+B0oQiAYBMk2JBAEOzBiM3mkuwdaUcjKasqgcpl
VmVdZhZAzO2dvPfee++999577733ujudTif33/8/XGZkAWz2zkrayZ4hgKrIHz9+fB8/Iv7Hv/cf
3N35fwC74FOIBQAAAA==
headers:
cache-control: no-cache
content-encoding: gzip
content-length: '127'
content-type: text/plain
date: Thu, 25 Mar 2021 20:21:11 GMT
elapsed-time: '67'
date: Sat, 08 May 2021 06:20:16 GMT
elapsed-time: '152'
expires: '-1'
odata-version: '4.0'
pragma: no-cache
preference-applied: odata.include-annotations="*"
request-id: a406bb84-8da7-11eb-ac20-a0481ca055a9
request-id: 746a9a78-afc5-11eb-ad79-a0481ca055a9
strict-transport-security: max-age=15724800; includeSubDomains
vary: Accept-Encoding
status:
Expand Down
Loading