Skip to content

feat: support gzip threshold #139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Features

1. [#130](https://github.com/InfluxCommunity/influxdb3-python/pull/130): Remove org parameters from the example code because It is not mandatory in Influxdb3
2. [#139](https://github.com/InfluxCommunity/influxdb3-python/pull/139): Supports environment variables with the same name like other clients

## 0.12.0 [2025-03-26]

Expand Down
48 changes: 47 additions & 1 deletion influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
INFLUX_ORG = "INFLUX_ORG"
INFLUX_PRECISION = "INFLUX_PRECISION"
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"


def write_client_options(**kwargs):
Expand Down Expand Up @@ -114,6 +115,30 @@
return precision


def _parse_gzip_threshold(threshold):
"""
Parses and validates the provided threshold value.

This function ensures that the given threshold is a valid integer value,
and it raises an appropriate error if the threshold is not valid. It also
enforces that the threshold value is non-negative.

:param threshold: The input threshold value to be parsed and validated.
:type threshold: Any
:return: The validated threshold value as an integer.
:rtype: int
:raises ValueError: If the provided threshold is not an integer or if it is
negative.
"""
try:
threshold = int(threshold)
except (TypeError, ValueError):
raise ValueError(f"Invalid threshold value: {threshold}. Must be integer.")

Check warning on line 136 in influxdb_client_3/__init__.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/__init__.py#L135-L136

Added lines #L135 - L136 were not covered by tests
if threshold < 0:
raise ValueError(f"Invalid threshold value: {threshold}. Must be non-negative.")

Check warning on line 138 in influxdb_client_3/__init__.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/__init__.py#L138

Added line #L138 was not covered by tests
return threshold


class InfluxDBClient3:
def __init__(
self,
Expand Down Expand Up @@ -226,7 +251,23 @@

@classmethod
def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':

"""
Creates an instance of the ``InfluxDBClient3`` class using environment
variables for configuration. This method simplifies client creation by
automatically reading required information from the system environment.

It verifies the presence of required environment variables such as host,
token, and database. If any of these variables are missing or empty,
a ``ValueError`` will be raised. Optional parameters such as precision and
authentication scheme will also be extracted from the environment when
present, allowing further customization of the client.

:param kwargs: Additional parameters that are passed to the client constructor.
:type kwargs: Any
:raises ValueError: If any required environment variables are missing or empty.
:return: An initialized client object of type ``InfluxDBClient3``.
:rtype: InfluxDBClient3
"""
required_vars = {
INFLUX_HOST: os.getenv(INFLUX_HOST),
INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
Expand All @@ -238,6 +279,11 @@

write_options = WriteOptions(write_type=WriteType.synchronous)

gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD)
if gzip_threshold is not None:
kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold)
kwargs['enable_gzip'] = True

precision = os.getenv(INFLUX_PRECISION)
if precision is not None:
write_options.write_precision = _parse_precision(precision)
Expand Down
48 changes: 42 additions & 6 deletions influxdb_client_3/write_client/_sync/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,40 @@ def set_default_header(self, header_name, header_value):
"""Set HTTP header for this API client."""
self.default_headers[header_name] = header_value

@staticmethod
def should_gzip(payload: str, enable_gzip: bool = False, gzip_threshold: int = None) -> bool:
"""
Determines whether gzip compression should be applied to the given payload based
on the specified conditions. This method evaluates the `enable_gzip` flag and
considers the size of the payload in relation to the optional `gzip_threshold`.
If `enable_gzip` is set to True and no threshold is provided, gzip compression
is advised without any size condition. If a threshold is specified, compression
is applied only when the size of the payload meets or exceeds the threshold.
By default, no compression is performed if `enable_gzip` is False.

:param payload: The payload data as a string for which gzip determination is to
be made.
:type payload: str
:param enable_gzip: A flag indicating whether gzip compression is enabled. By
default, this flag is False.
:type enable_gzip: bool, optional
:param gzip_threshold: Optional threshold specifying the minimum size (in bytes)
of the payload to trigger gzip compression. Only considered if
`enable_gzip` is True.
:type gzip_threshold: int, optional
:return: A boolean value indicating True if gzip compression should be applied
based on the payload size, the enable_gzip flag, and the gzip_threshold.
:rtype: bool
"""
if enable_gzip is not False:
if gzip_threshold is not None:
payload_size = len(payload.encode('utf-8'))
return payload_size >= gzip_threshold
if enable_gzip is True:
return True

return False

def __call_api(
self, resource_path, method, path_params=None,
query_params=None, header_params=None, body=None, post_params=None,
Expand All @@ -102,9 +136,16 @@ def __call_api(
config = self.configuration
self._signin(resource_path=resource_path)

# body
should_gzip = False
if body:
should_gzip = self.should_gzip(body, config.enable_gzip, config.gzip_threshold)
body = self.sanitize_for_serialization(body)
body = config.update_request_body(resource_path, body, should_gzip)

# header parameters
header_params = header_params or {}
config.update_request_header_params(resource_path, header_params)
config.update_request_header_params(resource_path, header_params, should_gzip)
header_params.update(self.default_headers)
if self.cookie:
header_params['Cookie'] = self.cookie
Expand Down Expand Up @@ -141,11 +182,6 @@ def __call_api(
# auth setting
self.update_params_for_auth(header_params, query_params, auth_settings)

# body
if body:
body = self.sanitize_for_serialization(body)
body = config.update_request_body(resource_path, body)

# request url
url = self.configuration.host + resource_path

Expand Down
60 changes: 7 additions & 53 deletions influxdb_client_3/write_client/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
else:
self.conf.host = self.url
self.conf.enable_gzip = enable_gzip
self.conf.gzip_threshold = kwargs.get('gzip_threshold', None)
self.conf.verify_ssl = kwargs.get('verify_ssl', True)
self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None)
self.conf.cert_file = kwargs.get('cert_file', None)
Expand Down Expand Up @@ -206,53 +207,6 @@ def _from_env_properties(cls, debug=None, enable_gzip=False, **kwargs):
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
profilers=profilers, **kwargs)

@classmethod
def _from_env(cls, debug=None, enable_gzip=False, **kwargs):
"""
Creates and configures an instance of the class using environment variable values. The method loads
configuration values for connecting to an InfluxDB server instance from preset environment variables.
Options include connection details such as host, token, organization, and optional parameters
like SSL settings, profiling, and default tags. Non-specified parameters fallback to defaults
or None, ensuring a straightforward integration with varied InfluxDB setups.

:param debug: Determines whether debugging mode is enabled.
:type debug: Optional[bool]
:param enable_gzip: Indicates whether gzip compression is enabled for requests.
:type enable_gzip: bool
:param kwargs: Additional keyword arguments to configure the instance.
:type kwargs: dict
:return: Instance of the class configured using the provided environmental settings.
:rtype: cls
"""
url = os.getenv('INFLUX_HOST', "http://localhost:8086")
token = os.getenv('INFLUX_TOKEN', "my-token")
org = os.getenv('INFLUX_ORG', "my-org")
timeout = os.getenv('INFLUX_TIMEOUT', "10000")
verify_ssl = os.getenv('INFLUX_VERIFY_SSL', "True")
ssl_ca_cert = os.getenv('INFLUX_SSL_CA_CERT', None)
cert_file = os.getenv('INFLUX_CERT_FILE', None)
cert_key_file = os.getenv('INFLUX_CERT_KEY_FILE', None)
cert_key_password = os.getenv('INFLUX_CERT_KEY_PASSWORD', None)
connection_pool_maxsize = os.getenv('INFLUX_CONNECTION_POOL_MAXSIZE', None)
auth_basic = os.getenv('INFLUX_AUTH_BASIC', "False")

prof = os.getenv("INFLUX_PROFILERS", None)
profilers = None
if prof is not None:
profilers = [x.strip() for x in prof.split(',')]

default_tags = dict()

for key, value in os.environ.items():
if key.startswith("INFLUX_TAG_"):
default_tags[key[11:].lower()] = value

return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,
enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,
cert_file=cert_file, cert_key_file=cert_key_file, cert_key_password=cert_key_password,
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
profilers=profilers, **kwargs)


class _BaseWriteApi(object):
def __init__(self, influxdb_client, point_settings=None):
Expand Down Expand Up @@ -324,9 +278,9 @@ def __init__(self):
self.username = None
self.password = None

def update_request_header_params(self, path: str, params: dict):
super().update_request_header_params(path, params)
if self.enable_gzip:
def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False):
super().update_request_header_params(path, params, should_gzip)
if should_gzip:
# GZIP Request
if path == '/api/v2/write':
params["Content-Encoding"] = "gzip"
Expand All @@ -340,9 +294,9 @@ def update_request_header_params(self, path: str, params: dict):
pass
pass

def update_request_body(self, path: str, body):
_body = super().update_request_body(path, body)
if self.enable_gzip:
def update_request_body(self, path: str, body, should_gzip: bool = False):
_body = super().update_request_body(path, body, should_gzip)
if should_gzip:
# GZIP Request
if path == '/api/v2/write':
import gzip
Expand Down
5 changes: 3 additions & 2 deletions influxdb_client_3/write_client/client/influxdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class InfluxDBClient(_BaseClient):
"""InfluxDBClient is client for InfluxDB v2."""

def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, org: str = None,
default_tags: dict = None, **kwargs) -> None:
default_tags: dict = None, gzip_threshold=None, **kwargs) -> None:
"""
Initialize defaults.

Expand Down Expand Up @@ -52,7 +52,8 @@ def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gz
:key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
:key list[str] profilers: list of enabled Flux profilers
"""
super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, org=org,
super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip,
gzip_threshold=gzip_threshold, org=org,
default_tags=default_tags, http_client_logger="urllib3", **kwargs)

from influxdb_client_3.write_client._sync.api_client import ApiClient
Expand Down
14 changes: 10 additions & 4 deletions influxdb_client_3/write_client/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ def __init__(self):
# Safe chars for path_param
self.safe_chars_for_path_param = ''

# Compression settings
self.enable_gzip = False
self.gzip_threshold = None

@property
def logger_file(self):
"""Logger file.
Expand Down Expand Up @@ -245,19 +249,21 @@ def to_debug_report(self):
"SDK Package Version: {client_version}".\
format(env=sys.platform, pyversion=sys.version, client_version=VERSION)

def update_request_header_params(self, path: str, params: dict):
def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False):
"""Update header params based on custom settings.

:param path: Resource path
:param path: Resource path.
:param params: Header parameters dict to be updated.
:param should_gzip: Describes if request body should be gzip compressed.
"""
pass

def update_request_body(self, path: str, body):
def update_request_body(self, path: str, body, should_gzip: bool = False):
"""Update http body based on custom settings.

:param path: Resource path
:param path: Resource path.
:param body: Request body to be updated.
:param should_gzip: Describes if request body should be gzip compressed.
:return: Updated body
"""
return body
22 changes: 22 additions & 0 deletions tests/test_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,25 @@ def test_api_error_headers(self):
self.assertEqual(headers['Trace-Sampled'], 'false')
self.assertEqual(headers['X-Influxdb-Request-Id'], requestid)
self.assertEqual(headers['X-Influxdb-Build'], 'Mock')

def test_should_gzip(self):
# Test when gzip is disabled
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=1))
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=10000))
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=None))

# Test when enable_gzip is True
self.assertTrue(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=None))
self.assertTrue(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=1))
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=100000))

# Test payload smaller than threshold
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=10000))

# Test payload larger than threshold
large_payload = "x" * 10000
self.assertTrue(ApiClient.should_gzip(large_payload, enable_gzip=True, gzip_threshold=1000))

# Test exact threshold match and less than threshold
payload = "x" * 1000
self.assertTrue(ApiClient.should_gzip(payload, enable_gzip=True, gzip_threshold=1000))
4 changes: 3 additions & 1 deletion tests/test_influxdb_client_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def verify_client_write_options(c):

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org',
'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme'})
'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme',
'INFLUX_GZIP_THRESHOLD': '2000'})
def test_from_env_all_env_vars_set(self):
client = InfluxDBClient3.from_env()
self.assertIsInstance(client, InfluxDBClient3)
Expand All @@ -113,6 +114,7 @@ def test_from_env_all_env_vars_set(self):
self.assertEqual(client._client.auth_header_value, f"custom_scheme {client._token}")
self.assertEqual(client._database, "test_db")
self.assertEqual(client._org, "test_org")
self.assertEqual(client._client.api_client.rest_client.configuration.gzip_threshold, 2000)

write_options = client._write_client_options.get("write_options")
self.assertEqual(write_options.write_precision, WritePrecision.MS)
Expand Down
Loading