diff --git a/CHANGELOG.md b/CHANGELOG.md index 4146ecf..1cf5011 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Features 1. [#130](https://github.com/InfluxCommunity/influxdb3-python/pull/130): Remove org parameters from the example code because It is not mandatory in Influxdb3 +2. [#139](https://github.com/InfluxCommunity/influxdb3-python/pull/139): Supports environment variables with the same name like other clients ## 0.12.0 [2025-03-26] diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index bea3640..dff80df 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -21,6 +21,7 @@ INFLUX_ORG = "INFLUX_ORG" INFLUX_PRECISION = "INFLUX_PRECISION" INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" +INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD" def write_client_options(**kwargs): @@ -114,6 +115,30 @@ def _parse_precision(precision): return precision +def _parse_gzip_threshold(threshold): + """ + Parses and validates the provided threshold value. + + This function ensures that the given threshold is a valid integer value, + and it raises an appropriate error if the threshold is not valid. It also + enforces that the threshold value is non-negative. + + :param threshold: The input threshold value to be parsed and validated. + :type threshold: Any + :return: The validated threshold value as an integer. + :rtype: int + :raises ValueError: If the provided threshold is not an integer or if it is + negative. + """ + try: + threshold = int(threshold) + except (TypeError, ValueError): + raise ValueError(f"Invalid threshold value: {threshold}. Must be integer.") + if threshold < 0: + raise ValueError(f"Invalid threshold value: {threshold}. Must be non-negative.") + return threshold + + class InfluxDBClient3: def __init__( self, @@ -226,7 +251,23 @@ def __init__( @classmethod def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': - + """ + Creates an instance of the ``InfluxDBClient3`` class using environment + variables for configuration. This method simplifies client creation by + automatically reading required information from the system environment. + + It verifies the presence of required environment variables such as host, + token, and database. If any of these variables are missing or empty, + a ``ValueError`` will be raised. Optional parameters such as precision and + authentication scheme will also be extracted from the environment when + present, allowing further customization of the client. + + :param kwargs: Additional parameters that are passed to the client constructor. + :type kwargs: Any + :raises ValueError: If any required environment variables are missing or empty. + :return: An initialized client object of type ``InfluxDBClient3``. + :rtype: InfluxDBClient3 + """ required_vars = { INFLUX_HOST: os.getenv(INFLUX_HOST), INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), @@ -238,6 +279,11 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': write_options = WriteOptions(write_type=WriteType.synchronous) + gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD) + if gzip_threshold is not None: + kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold) + kwargs['enable_gzip'] = True + precision = os.getenv(INFLUX_PRECISION) if precision is not None: write_options.write_precision = _parse_precision(precision) diff --git a/influxdb_client_3/write_client/_sync/api_client.py b/influxdb_client_3/write_client/_sync/api_client.py index 10ff1b7..d0bcd99 100644 --- a/influxdb_client_3/write_client/_sync/api_client.py +++ b/influxdb_client_3/write_client/_sync/api_client.py @@ -92,6 +92,40 @@ def set_default_header(self, header_name, header_value): """Set HTTP header for this API client.""" self.default_headers[header_name] = header_value + @staticmethod + def should_gzip(payload: str, enable_gzip: bool = False, gzip_threshold: int = None) -> bool: + """ + Determines whether gzip compression should be applied to the given payload based + on the specified conditions. This method evaluates the `enable_gzip` flag and + considers the size of the payload in relation to the optional `gzip_threshold`. + If `enable_gzip` is set to True and no threshold is provided, gzip compression + is advised without any size condition. If a threshold is specified, compression + is applied only when the size of the payload meets or exceeds the threshold. + By default, no compression is performed if `enable_gzip` is False. + + :param payload: The payload data as a string for which gzip determination is to + be made. + :type payload: str + :param enable_gzip: A flag indicating whether gzip compression is enabled. By + default, this flag is False. + :type enable_gzip: bool, optional + :param gzip_threshold: Optional threshold specifying the minimum size (in bytes) + of the payload to trigger gzip compression. Only considered if + `enable_gzip` is True. + :type gzip_threshold: int, optional + :return: A boolean value indicating True if gzip compression should be applied + based on the payload size, the enable_gzip flag, and the gzip_threshold. + :rtype: bool + """ + if enable_gzip is not False: + if gzip_threshold is not None: + payload_size = len(payload.encode('utf-8')) + return payload_size >= gzip_threshold + if enable_gzip is True: + return True + + return False + def __call_api( self, resource_path, method, path_params=None, query_params=None, header_params=None, body=None, post_params=None, @@ -102,9 +136,16 @@ def __call_api( config = self.configuration self._signin(resource_path=resource_path) + # body + should_gzip = False + if body: + should_gzip = self.should_gzip(body, config.enable_gzip, config.gzip_threshold) + body = self.sanitize_for_serialization(body) + body = config.update_request_body(resource_path, body, should_gzip) + # header parameters header_params = header_params or {} - config.update_request_header_params(resource_path, header_params) + config.update_request_header_params(resource_path, header_params, should_gzip) header_params.update(self.default_headers) if self.cookie: header_params['Cookie'] = self.cookie @@ -141,11 +182,6 @@ def __call_api( # auth setting self.update_params_for_auth(header_params, query_params, auth_settings) - # body - if body: - body = self.sanitize_for_serialization(body) - body = config.update_request_body(resource_path, body) - # request url url = self.configuration.host + resource_path diff --git a/influxdb_client_3/write_client/client/_base.py b/influxdb_client_3/write_client/client/_base.py index 22f0661..7fd0030 100644 --- a/influxdb_client_3/write_client/client/_base.py +++ b/influxdb_client_3/write_client/client/_base.py @@ -49,6 +49,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or else: self.conf.host = self.url self.conf.enable_gzip = enable_gzip + self.conf.gzip_threshold = kwargs.get('gzip_threshold', None) self.conf.verify_ssl = kwargs.get('verify_ssl', True) self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None) self.conf.cert_file = kwargs.get('cert_file', None) @@ -206,53 +207,6 @@ def _from_env_properties(cls, debug=None, enable_gzip=False, **kwargs): connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic), profilers=profilers, **kwargs) - @classmethod - def _from_env(cls, debug=None, enable_gzip=False, **kwargs): - """ - Creates and configures an instance of the class using environment variable values. The method loads - configuration values for connecting to an InfluxDB server instance from preset environment variables. - Options include connection details such as host, token, organization, and optional parameters - like SSL settings, profiling, and default tags. Non-specified parameters fallback to defaults - or None, ensuring a straightforward integration with varied InfluxDB setups. - - :param debug: Determines whether debugging mode is enabled. - :type debug: Optional[bool] - :param enable_gzip: Indicates whether gzip compression is enabled for requests. - :type enable_gzip: bool - :param kwargs: Additional keyword arguments to configure the instance. - :type kwargs: dict - :return: Instance of the class configured using the provided environmental settings. - :rtype: cls - """ - url = os.getenv('INFLUX_HOST', "http://localhost:8086") - token = os.getenv('INFLUX_TOKEN', "my-token") - org = os.getenv('INFLUX_ORG', "my-org") - timeout = os.getenv('INFLUX_TIMEOUT', "10000") - verify_ssl = os.getenv('INFLUX_VERIFY_SSL', "True") - ssl_ca_cert = os.getenv('INFLUX_SSL_CA_CERT', None) - cert_file = os.getenv('INFLUX_CERT_FILE', None) - cert_key_file = os.getenv('INFLUX_CERT_KEY_FILE', None) - cert_key_password = os.getenv('INFLUX_CERT_KEY_PASSWORD', None) - connection_pool_maxsize = os.getenv('INFLUX_CONNECTION_POOL_MAXSIZE', None) - auth_basic = os.getenv('INFLUX_AUTH_BASIC', "False") - - prof = os.getenv("INFLUX_PROFILERS", None) - profilers = None - if prof is not None: - profilers = [x.strip() for x in prof.split(',')] - - default_tags = dict() - - for key, value in os.environ.items(): - if key.startswith("INFLUX_TAG_"): - default_tags[key[11:].lower()] = value - - return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags, - enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert, - cert_file=cert_file, cert_key_file=cert_key_file, cert_key_password=cert_key_password, - connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic), - profilers=profilers, **kwargs) - class _BaseWriteApi(object): def __init__(self, influxdb_client, point_settings=None): @@ -324,9 +278,9 @@ def __init__(self): self.username = None self.password = None - def update_request_header_params(self, path: str, params: dict): - super().update_request_header_params(path, params) - if self.enable_gzip: + def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False): + super().update_request_header_params(path, params, should_gzip) + if should_gzip: # GZIP Request if path == '/api/v2/write': params["Content-Encoding"] = "gzip" @@ -340,9 +294,9 @@ def update_request_header_params(self, path: str, params: dict): pass pass - def update_request_body(self, path: str, body): - _body = super().update_request_body(path, body) - if self.enable_gzip: + def update_request_body(self, path: str, body, should_gzip: bool = False): + _body = super().update_request_body(path, body, should_gzip) + if should_gzip: # GZIP Request if path == '/api/v2/write': import gzip diff --git a/influxdb_client_3/write_client/client/influxdb_client.py b/influxdb_client_3/write_client/client/influxdb_client.py index 97d98dc..182c0a0 100644 --- a/influxdb_client_3/write_client/client/influxdb_client.py +++ b/influxdb_client_3/write_client/client/influxdb_client.py @@ -16,7 +16,7 @@ class InfluxDBClient(_BaseClient): """InfluxDBClient is client for InfluxDB v2.""" def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, org: str = None, - default_tags: dict = None, **kwargs) -> None: + default_tags: dict = None, gzip_threshold=None, **kwargs) -> None: """ Initialize defaults. @@ -52,7 +52,8 @@ def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gz :key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x :key list[str] profilers: list of enabled Flux profilers """ - super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, org=org, + super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, + gzip_threshold=gzip_threshold, org=org, default_tags=default_tags, http_client_logger="urllib3", **kwargs) from influxdb_client_3.write_client._sync.api_client import ApiClient diff --git a/influxdb_client_3/write_client/configuration.py b/influxdb_client_3/write_client/configuration.py index ec0b97b..2946156 100644 --- a/influxdb_client_3/write_client/configuration.py +++ b/influxdb_client_3/write_client/configuration.py @@ -98,6 +98,10 @@ def __init__(self): # Safe chars for path_param self.safe_chars_for_path_param = '' + # Compression settings + self.enable_gzip = False + self.gzip_threshold = None + @property def logger_file(self): """Logger file. @@ -245,19 +249,21 @@ def to_debug_report(self): "SDK Package Version: {client_version}".\ format(env=sys.platform, pyversion=sys.version, client_version=VERSION) - def update_request_header_params(self, path: str, params: dict): + def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False): """Update header params based on custom settings. - :param path: Resource path + :param path: Resource path. :param params: Header parameters dict to be updated. + :param should_gzip: Describes if request body should be gzip compressed. """ pass - def update_request_body(self, path: str, body): + def update_request_body(self, path: str, body, should_gzip: bool = False): """Update http body based on custom settings. - :param path: Resource path + :param path: Resource path. :param body: Request body to be updated. + :param should_gzip: Describes if request body should be gzip compressed. :return: Updated body """ return body diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 9976cfb..bcbe983 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -139,3 +139,25 @@ def test_api_error_headers(self): self.assertEqual(headers['Trace-Sampled'], 'false') self.assertEqual(headers['X-Influxdb-Request-Id'], requestid) self.assertEqual(headers['X-Influxdb-Build'], 'Mock') + + def test_should_gzip(self): + # Test when gzip is disabled + self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=1)) + self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=10000)) + self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=None)) + + # Test when enable_gzip is True + self.assertTrue(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=None)) + self.assertTrue(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=1)) + self.assertFalse(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=100000)) + + # Test payload smaller than threshold + self.assertFalse(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=10000)) + + # Test payload larger than threshold + large_payload = "x" * 10000 + self.assertTrue(ApiClient.should_gzip(large_payload, enable_gzip=True, gzip_threshold=1000)) + + # Test exact threshold match and less than threshold + payload = "x" * 1000 + self.assertTrue(ApiClient.should_gzip(payload, enable_gzip=True, gzip_threshold=1000)) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 729d8d0..a2fd065 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -104,7 +104,8 @@ def verify_client_write_options(c): @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', - 'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme'}) + 'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme', + 'INFLUX_GZIP_THRESHOLD': '2000'}) def test_from_env_all_env_vars_set(self): client = InfluxDBClient3.from_env() self.assertIsInstance(client, InfluxDBClient3) @@ -113,6 +114,7 @@ def test_from_env_all_env_vars_set(self): self.assertEqual(client._client.auth_header_value, f"custom_scheme {client._token}") self.assertEqual(client._database, "test_db") self.assertEqual(client._org, "test_org") + self.assertEqual(client._client.api_client.rest_client.configuration.gzip_threshold, 2000) write_options = client._write_client_options.get("write_options") self.assertEqual(write_options.write_precision, WritePrecision.MS)