Skip to content

Commit db0c51d

Browse files
feat: support env variable fo influxdb v3
1 parent 7d5952f commit db0c51d

File tree

6 files changed

+166
-59
lines changed

6 files changed

+166
-59
lines changed

influxdb_client_3/__init__.py

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import importlib.util
2+
import os
23
import urllib.parse
4+
from typing import Any
35

46
import pyarrow as pa
57

@@ -8,11 +10,18 @@
810
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
911
from influxdb_client_3.write_client.client.exceptions import InfluxDBError
1012
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
11-
PointSettings
13+
PointSettings, DefaultWriteOptions, WriteType
1214
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
1315

1416
polars = importlib.util.find_spec("polars") is not None
1517

18+
INFLUX_HOST = "INFLUX_HOST"
19+
INFLUX_TOKEN = "INFLUX_TOKEN"
20+
INFLUX_DATABASE = "INFLUX_DATABASE"
21+
INFLUX_ORG = "INFLUX_ORG"
22+
INFLUX_PRECISION = "INFLUX_PRECISION"
23+
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
24+
1625

1726
def write_client_options(**kwargs):
1827
"""
@@ -84,6 +93,27 @@ def _merge_options(defaults, exclude_keys=None, custom=None):
8493
return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys})
8594

8695

96+
def _parse_precision(precision):
97+
"""
98+
Parses the precision value and ensures it is valid.
99+
100+
This function checks that the given `precision` is one of the allowed
101+
values defined in `WritePrecision`. If the precision is invalid, it
102+
raises a `ValueError`. The function returns the valid precision value
103+
if it passes validation.
104+
105+
:param precision: The precision value to be validated.
106+
Must be one of WritePrecision.NS, WritePrecision.MS,
107+
WritePrecision.S, or WritePrecision.US.
108+
:return: The valid precision value.
109+
:rtype: WritePrecision
110+
:raises ValueError: If the provided precision is not valid.
111+
"""
112+
if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]:
113+
raise ValueError(f"Invalid precision value: {precision}")
114+
return precision
115+
116+
87117
class InfluxDBClient3:
88118
def __init__(
89119
self,
@@ -137,8 +167,23 @@ def __init__(
137167
self._org = org if org is not None else "default"
138168
self._database = database
139169
self._token = token
140-
self._write_client_options = write_client_options if write_client_options is not None \
141-
else default_client_options(write_options=SYNCHRONOUS)
170+
171+
write_type = DefaultWriteOptions.write_type.value
172+
write_precision = DefaultWriteOptions.write_precision.value
173+
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
174+
write_opts = write_client_options['write_options']
175+
write_type = getattr(write_opts, 'write_type', write_type)
176+
write_precision = getattr(write_opts, 'write_precision', write_precision)
177+
178+
write_options = WriteOptions(
179+
write_type=write_type,
180+
write_precision=write_precision,
181+
)
182+
183+
self._write_client_options = {
184+
"write_options": write_options,
185+
**(write_client_options or {})
186+
}
142187

143188
# Parse the host input
144189
parsed_url = urllib.parse.urlparse(host)
@@ -179,6 +224,39 @@ def __init__(
179224
flight_client_options=flight_client_options,
180225
proxy=kwargs.get("proxy", None), options=q_opts_builder.build())
181226

227+
@classmethod
228+
def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
229+
230+
required_vars = {
231+
INFLUX_HOST: os.getenv(INFLUX_HOST),
232+
INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
233+
INFLUX_DATABASE: os.getenv(INFLUX_DATABASE)
234+
}
235+
missing_vars = [var for var, value in required_vars.items() if value is None or value == ""]
236+
if missing_vars:
237+
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
238+
239+
write_options = WriteOptions(write_type=WriteType.synchronous)
240+
241+
precision = os.getenv(INFLUX_PRECISION)
242+
if precision is not None:
243+
write_options.write_precision = _parse_precision(precision)
244+
245+
write_client_option = {'write_options': write_options}
246+
247+
if os.getenv(INFLUX_AUTH_SCHEME) is not None:
248+
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)
249+
250+
org = os.getenv(INFLUX_ORG, "default")
251+
return InfluxDBClient3(
252+
host=required_vars[INFLUX_HOST],
253+
token=required_vars[INFLUX_TOKEN],
254+
database=required_vars[INFLUX_DATABASE],
255+
write_client_options=write_client_option,
256+
org=org,
257+
**kwargs
258+
)
259+
182260
def write(self, record=None, database=None, **kwargs):
183261
"""
184262
Write data to InfluxDB.

influxdb_client_3/write_client/client/_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def _has_section(key: str):
175175
profilers=profilers, proxy=proxy, **kwargs)
176176

177177
@classmethod
178-
@deprecated('Use _from_env() instead.')
178+
@deprecated('Use InfluxDBClient3.from_env() instead.')
179179
def _from_env_properties(cls, debug=None, enable_gzip=False, **kwargs):
180180
url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")
181181
token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")

influxdb_client_3/write_client/client/influxdb_client.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
169169
return InfluxDBClient._from_config_file(config_file=config_file, debug=debug, enable_gzip=enable_gzip, **kwargs)
170170

171171
@classmethod
172-
@deprecated('Use InfluxDBClient.from_env() instead.')
172+
@deprecated('Use InfluxDBClient3.from_env() instead.')
173173
def from_env_properties(cls, debug=None, enable_gzip=False, **kwargs):
174174
"""
175175
Configure client via environment properties.
@@ -203,22 +203,6 @@ def from_env_properties(cls, debug=None, enable_gzip=False, **kwargs):
203203
"""
204204
return InfluxDBClient._from_env_properties(debug=debug, enable_gzip=enable_gzip, **kwargs)
205205

206-
@classmethod
207-
def from_env(cls, debug=None, enable_gzip=False, **kwargs):
208-
"""
209-
Creates an instance of the class using environment configuration variables.
210-
211-
This class method retrieves configuration variables from the system environment
212-
and uses them to configure and initialize an instance of the class. This allows
213-
for dynamic configuration of the client without the need for hardcoding values
214-
or explicitly passing them during instantiation.
215-
216-
:param debug: Enable verbose logging of http requests
217-
:param enable_gzip: Enable Gzip compression for http requests. Currently, only the "Write" and "Query" endpoints
218-
supports the Gzip compression.
219-
"""
220-
return InfluxDBClient._from_env(debug=debug, enable_gzip=enable_gzip, **kwargs)
221-
222206
def write_api(self, write_options=WriteOptions(), point_settings=PointSettings(), **kwargs) -> WriteApi:
223207
"""
224208
Create Write API instance.

influxdb_client_3/write_client/client/write_api.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
from reactivex.scheduler import ThreadPoolScheduler
1717
from reactivex.subject import Subject
1818

19-
from influxdb_client_3.write_client.domain import WritePrecision
2019
from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS
2120
from influxdb_client_3.write_client.client.util.helpers import get_org_query_param
2221
from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer
2322
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
2423
from influxdb_client_3.write_client.client.write.retry import WritesRetry
24+
from influxdb_client_3.write_client.domain import WritePrecision
2525
from influxdb_client_3.write_client.rest import _UTF_8_encoding
2626

2727
logger = logging.getLogger('influxdb_client_3.write_client.client.write_api')
@@ -39,6 +39,11 @@ class WriteType(Enum):
3939
synchronous = 3
4040

4141

42+
class DefaultWriteOptions(Enum):
43+
write_type = WriteType.synchronous
44+
write_precision = WritePrecision.NS
45+
46+
4247
class WriteOptions(object):
4348
"""Write configuration."""
4449

@@ -51,6 +56,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5156
max_retry_time=180_000,
5257
exponential_base=2,
5358
max_close_wait=300_000,
59+
write_precision=DEFAULT_WRITE_PRECISION,
5460
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
5561
"""
5662
Create write api configuration.
@@ -80,6 +86,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
8086
self.exponential_base = exponential_base
8187
self.write_scheduler = write_scheduler
8288
self.max_close_wait = max_close_wait
89+
self.write_precision = write_precision
8390

8491
def to_retry_strategy(self, **kwargs):
8592
"""
@@ -290,7 +297,7 @@ def write(self, bucket: str, org: str = None,
290297
str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'],
291298
Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass']
292299
] = None,
293-
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any:
300+
write_precision: WritePrecision = None, **kwargs) -> Any:
294301
"""
295302
Write time-series data into InfluxDB.
296303
@@ -361,6 +368,9 @@ def write(self, bucket: str, org: str = None,
361368

362369
self._append_default_tags(record)
363370

371+
if write_precision is None:
372+
write_precision = self._write_options.write_precision
373+
364374
if self._write_options.write_type is WriteType.batching:
365375
return self._write_batching(bucket, org, record,
366376
write_precision, **kwargs)
@@ -443,8 +453,11 @@ def __del__(self):
443453
pass
444454

445455
def _write_batching(self, bucket, org, data,
446-
precision=DEFAULT_WRITE_PRECISION,
456+
precision=None,
447457
**kwargs):
458+
if precision is None:
459+
precision = self._write_options.write_precision
460+
448461
if isinstance(data, bytes):
449462
_key = _BatchItemKey(bucket, org, precision)
450463
self._subject.on_next(_BatchItem(key=_key, data=data))

tests/test_influxdb_client.py

Lines changed: 0 additions & 34 deletions
This file was deleted.

tests/test_influxdb_client_3.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import unittest
22
from unittest.mock import patch
33

4-
from influxdb_client_3 import InfluxDBClient3
4+
from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType
55
from tests.util import asyncio_run
66
from tests.util.mocks import ConstantFlightServer, ConstantData
77

@@ -74,6 +74,72 @@ async def test_query_async(self):
7474
assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list
7575
assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list
7676

77+
def test_write_api_custom_options_no_error(self):
78+
write_options = WriteOptions(write_type=WriteType.batching)
79+
write_client_option = {'write_options': write_options}
80+
client = InfluxDBClient3(write_client_options=write_client_option)
81+
try:
82+
client._write_api._write_batching("bucket", "org", Point.measurement("test"), None)
83+
self.assertTrue(True)
84+
except Exception as e:
85+
self.fail(f"Write API with default options raised an exception: {str(e)}")
86+
87+
def test_default_client(self):
88+
expected_precision = DefaultWriteOptions.write_precision.value
89+
expected_write_type = DefaultWriteOptions.write_type.value
90+
91+
def verify_client_write_options(c):
92+
write_options = c._write_client_options.get('write_options')
93+
self.assertEqual(write_options.write_precision, expected_precision)
94+
self.assertEqual(write_options.write_type, expected_write_type)
95+
96+
self.assertEqual(c._write_api._write_options.write_precision, expected_precision)
97+
self.assertEqual(c._write_api._write_options.write_type, expected_write_type)
98+
99+
env_client = InfluxDBClient3.from_env()
100+
verify_client_write_options(env_client)
101+
102+
default_client = InfluxDBClient3()
103+
verify_client_write_options(default_client)
104+
105+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
106+
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org',
107+
'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme'})
108+
def test_from_env_all_env_vars_set(self):
109+
client = InfluxDBClient3.from_env()
110+
self.assertIsInstance(client, InfluxDBClient3)
111+
self.assertEqual(client._token, "test_token")
112+
self.assertEqual(client._client.url, "https://localhost:443")
113+
self.assertEqual(client._client.auth_header_value, f"custom_scheme {client._token}")
114+
self.assertEqual(client._database, "test_db")
115+
self.assertEqual(client._org, "test_org")
116+
117+
write_options = client._write_client_options.get("write_options")
118+
self.assertEqual(write_options.write_precision, WritePrecision.MS)
119+
120+
client._write_api._point_settings = {}
121+
122+
@patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "",
123+
'INFLUX_DATABASE': "", 'INFLUX_ORG': ""})
124+
def test_from_env_missing_variables(self):
125+
with self.assertRaises(ValueError) as context:
126+
InfluxDBClient3.from_env()
127+
self.assertIn("Missing required environment variables", str(context.exception))
128+
129+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
130+
'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': WritePrecision.MS})
131+
def test_parse_valid_write_precision(self):
132+
client = InfluxDBClient3.from_env()
133+
self.assertIsInstance(client, InfluxDBClient3)
134+
self.assertEqual(client._write_client_options.get('write_options').write_precision, WritePrecision.MS)
135+
136+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
137+
'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': 'invalid_value'})
138+
def test_parse_invalid_write_precision(self):
139+
with self.assertRaises(ValueError) as context:
140+
InfluxDBClient3.from_env()
141+
self.assertIn("Invalid precision value: invalid_value", str(context.exception))
142+
77143

78144
if __name__ == '__main__':
79145
unittest.main()

0 commit comments

Comments
 (0)