Skip to content

Commit cd55bcd

Browse files
feat: support gzip threshold
1 parent db0c51d commit cd55bcd

File tree

8 files changed

+135
-67
lines changed

8 files changed

+135
-67
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Features
66

77
1. [#130](https://github.com/InfluxCommunity/influxdb3-python/pull/130): Remove org parameters from the example code because It is not mandatory in Influxdb3
8+
2. [#139](https://github.com/InfluxCommunity/influxdb3-python/pull/139): Supports environment variables with the same name like other clients
89

910
## 0.12.0 [2025-03-26]
1011

influxdb_client_3/__init__.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
INFLUX_ORG = "INFLUX_ORG"
2222
INFLUX_PRECISION = "INFLUX_PRECISION"
2323
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
24+
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
2425

2526

2627
def write_client_options(**kwargs):
@@ -114,6 +115,30 @@ def _parse_precision(precision):
114115
return precision
115116

116117

118+
def _parse_gzip_threshold(threshold):
119+
"""
120+
Parses and validates the provided threshold value.
121+
122+
This function ensures that the given threshold is a valid integer value,
123+
and it raises an appropriate error if the threshold is not valid. It also
124+
enforces that the threshold value is non-negative.
125+
126+
:param threshold: The input threshold value to be parsed and validated.
127+
:type threshold: Any
128+
:return: The validated threshold value as an integer.
129+
:rtype: int
130+
:raises ValueError: If the provided threshold is not an integer or if it is
131+
negative.
132+
"""
133+
try:
134+
threshold = int(threshold)
135+
except (TypeError, ValueError):
136+
raise ValueError(f"Invalid threshold value: {threshold}. Must be integer.")
137+
if threshold < 0:
138+
raise ValueError(f"Invalid threshold value: {threshold}. Must be non-negative.")
139+
return threshold
140+
141+
117142
class InfluxDBClient3:
118143
def __init__(
119144
self,
@@ -226,7 +251,23 @@ def __init__(
226251

227252
@classmethod
228253
def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
229-
254+
"""
255+
Creates an instance of the ``InfluxDBClient3`` class using environment
256+
variables for configuration. This method simplifies client creation by
257+
automatically reading required information from the system environment.
258+
259+
It verifies the presence of required environment variables such as host,
260+
token, and database. If any of these variables are missing or empty,
261+
a ``ValueError`` will be raised. Optional parameters such as precision and
262+
authentication scheme will also be extracted from the environment when
263+
present, allowing further customization of the client.
264+
265+
:param kwargs: Additional parameters that are passed to the client constructor.
266+
:type kwargs: Any
267+
:raises ValueError: If any required environment variables are missing or empty.
268+
:return: An initialized client object of type ``InfluxDBClient3``.
269+
:rtype: InfluxDBClient3
270+
"""
230271
required_vars = {
231272
INFLUX_HOST: os.getenv(INFLUX_HOST),
232273
INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
@@ -238,6 +279,11 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
238279

239280
write_options = WriteOptions(write_type=WriteType.synchronous)
240281

282+
gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD)
283+
if gzip_threshold is not None:
284+
kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold)
285+
kwargs['enable_gzip'] = True
286+
241287
precision = os.getenv(INFLUX_PRECISION)
242288
if precision is not None:
243289
write_options.write_precision = _parse_precision(precision)

influxdb_client_3/write_client/_sync/api_client.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,40 @@ def set_default_header(self, header_name, header_value):
9292
"""Set HTTP header for this API client."""
9393
self.default_headers[header_name] = header_value
9494

95+
@staticmethod
96+
def should_gzip(payload: str, enable_gzip: bool = False, gzip_threshold: int = None) -> bool:
97+
"""
98+
Determines whether gzip compression should be applied to the given payload based
99+
on the specified conditions. This method evaluates the `enable_gzip` flag and
100+
considers the size of the payload in relation to the optional `gzip_threshold`.
101+
If `enable_gzip` is set to True and no threshold is provided, gzip compression
102+
is advised without any size condition. If a threshold is specified, compression
103+
is applied only when the size of the payload meets or exceeds the threshold.
104+
By default, no compression is performed if `enable_gzip` is False.
105+
106+
:param payload: The payload data as a string for which gzip determination is to
107+
be made.
108+
:type payload: str
109+
:param enable_gzip: A flag indicating whether gzip compression is enabled. By
110+
default, this flag is False.
111+
:type enable_gzip: bool, optional
112+
:param gzip_threshold: Optional threshold specifying the minimum size (in bytes)
113+
of the payload to trigger gzip compression. Only considered if
114+
`enable_gzip` is True.
115+
:type gzip_threshold: int, optional
116+
:return: A boolean value indicating True if gzip compression should be applied
117+
based on the payload size, the enable_gzip flag, and the gzip_threshold.
118+
:rtype: bool
119+
"""
120+
if enable_gzip is not False:
121+
if gzip_threshold is not None:
122+
payload_size = len(payload.encode('utf-8'))
123+
return payload_size >= gzip_threshold
124+
if enable_gzip is True:
125+
return True
126+
127+
return False
128+
95129
def __call_api(
96130
self, resource_path, method, path_params=None,
97131
query_params=None, header_params=None, body=None, post_params=None,
@@ -102,9 +136,16 @@ def __call_api(
102136
config = self.configuration
103137
self._signin(resource_path=resource_path)
104138

139+
# body
140+
should_gzip = False
141+
if body:
142+
should_gzip = self.should_gzip(body, config.enable_gzip, config.gzip_threshold)
143+
body = self.sanitize_for_serialization(body)
144+
body = config.update_request_body(resource_path, body, should_gzip)
145+
105146
# header parameters
106147
header_params = header_params or {}
107-
config.update_request_header_params(resource_path, header_params)
148+
config.update_request_header_params(resource_path, header_params, should_gzip)
108149
header_params.update(self.default_headers)
109150
if self.cookie:
110151
header_params['Cookie'] = self.cookie
@@ -141,11 +182,6 @@ def __call_api(
141182
# auth setting
142183
self.update_params_for_auth(header_params, query_params, auth_settings)
143184

144-
# body
145-
if body:
146-
body = self.sanitize_for_serialization(body)
147-
body = config.update_request_body(resource_path, body)
148-
149185
# request url
150186
url = self.configuration.host + resource_path
151187

influxdb_client_3/write_client/client/_base.py

Lines changed: 7 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
4949
else:
5050
self.conf.host = self.url
5151
self.conf.enable_gzip = enable_gzip
52+
self.conf.gzip_threshold = kwargs.get('gzip_threshold', None)
5253
self.conf.verify_ssl = kwargs.get('verify_ssl', True)
5354
self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None)
5455
self.conf.cert_file = kwargs.get('cert_file', None)
@@ -206,53 +207,6 @@ def _from_env_properties(cls, debug=None, enable_gzip=False, **kwargs):
206207
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
207208
profilers=profilers, **kwargs)
208209

209-
@classmethod
210-
def _from_env(cls, debug=None, enable_gzip=False, **kwargs):
211-
"""
212-
Creates and configures an instance of the class using environment variable values. The method loads
213-
configuration values for connecting to an InfluxDB server instance from preset environment variables.
214-
Options include connection details such as host, token, organization, and optional parameters
215-
like SSL settings, profiling, and default tags. Non-specified parameters fallback to defaults
216-
or None, ensuring a straightforward integration with varied InfluxDB setups.
217-
218-
:param debug: Determines whether debugging mode is enabled.
219-
:type debug: Optional[bool]
220-
:param enable_gzip: Indicates whether gzip compression is enabled for requests.
221-
:type enable_gzip: bool
222-
:param kwargs: Additional keyword arguments to configure the instance.
223-
:type kwargs: dict
224-
:return: Instance of the class configured using the provided environmental settings.
225-
:rtype: cls
226-
"""
227-
url = os.getenv('INFLUX_HOST', "http://localhost:8086")
228-
token = os.getenv('INFLUX_TOKEN', "my-token")
229-
org = os.getenv('INFLUX_ORG', "my-org")
230-
timeout = os.getenv('INFLUX_TIMEOUT', "10000")
231-
verify_ssl = os.getenv('INFLUX_VERIFY_SSL', "True")
232-
ssl_ca_cert = os.getenv('INFLUX_SSL_CA_CERT', None)
233-
cert_file = os.getenv('INFLUX_CERT_FILE', None)
234-
cert_key_file = os.getenv('INFLUX_CERT_KEY_FILE', None)
235-
cert_key_password = os.getenv('INFLUX_CERT_KEY_PASSWORD', None)
236-
connection_pool_maxsize = os.getenv('INFLUX_CONNECTION_POOL_MAXSIZE', None)
237-
auth_basic = os.getenv('INFLUX_AUTH_BASIC', "False")
238-
239-
prof = os.getenv("INFLUX_PROFILERS", None)
240-
profilers = None
241-
if prof is not None:
242-
profilers = [x.strip() for x in prof.split(',')]
243-
244-
default_tags = dict()
245-
246-
for key, value in os.environ.items():
247-
if key.startswith("INFLUX_TAG_"):
248-
default_tags[key[11:].lower()] = value
249-
250-
return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,
251-
enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,
252-
cert_file=cert_file, cert_key_file=cert_key_file, cert_key_password=cert_key_password,
253-
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
254-
profilers=profilers, **kwargs)
255-
256210

257211
class _BaseWriteApi(object):
258212
def __init__(self, influxdb_client, point_settings=None):
@@ -324,9 +278,9 @@ def __init__(self):
324278
self.username = None
325279
self.password = None
326280

327-
def update_request_header_params(self, path: str, params: dict):
328-
super().update_request_header_params(path, params)
329-
if self.enable_gzip:
281+
def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False):
282+
super().update_request_header_params(path, params, should_gzip)
283+
if should_gzip:
330284
# GZIP Request
331285
if path == '/api/v2/write':
332286
params["Content-Encoding"] = "gzip"
@@ -340,9 +294,9 @@ def update_request_header_params(self, path: str, params: dict):
340294
pass
341295
pass
342296

343-
def update_request_body(self, path: str, body):
344-
_body = super().update_request_body(path, body)
345-
if self.enable_gzip:
297+
def update_request_body(self, path: str, body, should_gzip: bool = False):
298+
_body = super().update_request_body(path, body, should_gzip)
299+
if should_gzip:
346300
# GZIP Request
347301
if path == '/api/v2/write':
348302
import gzip

influxdb_client_3/write_client/client/influxdb_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class InfluxDBClient(_BaseClient):
1616
"""InfluxDBClient is client for InfluxDB v2."""
1717

1818
def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, org: str = None,
19-
default_tags: dict = None, **kwargs) -> None:
19+
default_tags: dict = None, gzip_threshold=None, **kwargs) -> None:
2020
"""
2121
Initialize defaults.
2222
@@ -52,7 +52,8 @@ def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gz
5252
:key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
5353
:key list[str] profilers: list of enabled Flux profilers
5454
"""
55-
super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, org=org,
55+
super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip,
56+
gzip_threshold=gzip_threshold, org=org,
5657
default_tags=default_tags, http_client_logger="urllib3", **kwargs)
5758

5859
from influxdb_client_3.write_client._sync.api_client import ApiClient

influxdb_client_3/write_client/configuration.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ def __init__(self):
9898
# Safe chars for path_param
9999
self.safe_chars_for_path_param = ''
100100

101+
# Compression settings
102+
self.enable_gzip = False
103+
self.gzip_threshold = None
104+
101105
@property
102106
def logger_file(self):
103107
"""Logger file.
@@ -245,19 +249,21 @@ def to_debug_report(self):
245249
"SDK Package Version: {client_version}".\
246250
format(env=sys.platform, pyversion=sys.version, client_version=VERSION)
247251

248-
def update_request_header_params(self, path: str, params: dict):
252+
def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False):
249253
"""Update header params based on custom settings.
250254
251-
:param path: Resource path
255+
:param path: Resource path.
252256
:param params: Header parameters dict to be updated.
257+
:param should_gzip: Describes if request body should be gzip compressed.
253258
"""
254259
pass
255260

256-
def update_request_body(self, path: str, body):
261+
def update_request_body(self, path: str, body, should_gzip: bool = False):
257262
"""Update http body based on custom settings.
258263
259-
:param path: Resource path
264+
:param path: Resource path.
260265
:param body: Request body to be updated.
266+
:param should_gzip: Describes if request body should be gzip compressed.
261267
:return: Updated body
262268
"""
263269
return body

tests/test_api_client.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,25 @@ def test_api_error_headers(self):
139139
self.assertEqual(headers['Trace-Sampled'], 'false')
140140
self.assertEqual(headers['X-Influxdb-Request-Id'], requestid)
141141
self.assertEqual(headers['X-Influxdb-Build'], 'Mock')
142+
143+
def test_should_gzip(self):
144+
# Test when gzip is disabled
145+
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=1))
146+
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=10000))
147+
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=None))
148+
149+
# Test when enable_gzip is True
150+
self.assertTrue(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=None))
151+
self.assertTrue(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=1))
152+
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=100000))
153+
154+
# Test payload smaller than threshold
155+
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=10000))
156+
157+
# Test payload larger than threshold
158+
large_payload = "x" * 10000
159+
self.assertTrue(ApiClient.should_gzip(large_payload, enable_gzip=True, gzip_threshold=1000))
160+
161+
# Test exact threshold match and less than threshold
162+
payload = "x" * 1000
163+
self.assertTrue(ApiClient.should_gzip(payload, enable_gzip=True, gzip_threshold=1000))

tests/test_influxdb_client_3.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ def verify_client_write_options(c):
104104

105105
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
106106
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org',
107-
'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme'})
107+
'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme',
108+
'INFLUX_GZIP_THRESHOLD': '2000'})
108109
def test_from_env_all_env_vars_set(self):
109110
client = InfluxDBClient3.from_env()
110111
self.assertIsInstance(client, InfluxDBClient3)
@@ -113,6 +114,7 @@ def test_from_env_all_env_vars_set(self):
113114
self.assertEqual(client._client.auth_header_value, f"custom_scheme {client._token}")
114115
self.assertEqual(client._database, "test_db")
115116
self.assertEqual(client._org, "test_org")
117+
self.assertEqual(client._client.api_client.rest_client.configuration.gzip_threshold, 2000)
116118

117119
write_options = client._write_client_options.get("write_options")
118120
self.assertEqual(write_options.write_precision, WritePrecision.MS)

0 commit comments

Comments
 (0)