From f9ba7230b7c41f085d6ac4d600e62695a3a73163 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Sep 2021 10:57:24 +0200 Subject: [PATCH 1/8] feat: add supports for write `NamedTuple` --- .circleci/config.yml | 1 + README.rst | 14 ++--- docs/api.rst | 6 +++ examples/README.md | 1 + examples/example.py | 2 +- examples/write_structured_data.py | 47 ++++++++++++++++ influxdb_client/client/write/point.py | 78 ++++++++++++++++++++++++--- influxdb_client/client/write_api.py | 59 +++++++++++++++++--- tests/test_WriteApi.py | 18 +++++++ tests/test_WriteApiBatching.py | 25 ++++++++- tests/test_point.py | 64 ++++++++++++++++++++++ 11 files changed, 291 insertions(+), 24 deletions(-) create mode 100644 examples/write_structured_data.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 89c685e4..56ddf7b5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -123,6 +123,7 @@ jobs: export PYTHONPATH="$PWD" python examples/monitoring_and_alerting.py python examples/buckets_management.py + python examples/write_structured_data.py check-sphinx: docker: - image: *default-python diff --git a/README.rst b/README.rst index 9ae80e36..8ad96eae 100644 --- a/README.rst +++ b/README.rst @@ -358,11 +358,13 @@ The data could be written as 1. ``string`` or ``bytes`` that is formatted as a InfluxDB's line protocol 2. `Data Point `__ structure -3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` -4. List of above items -5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item -6. `Pandas DataFrame `_ +3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` or custom structure +4. `NamedTuple `_ +5. List of above items +6. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item +7. `Pandas DataFrame `_ +You can find write examples at GitHub: `influxdb-client-python/examples `_. Batching """""""" @@ -528,7 +530,7 @@ In a `init `_ configuration customer = California Miner data_center = ${env.data_center} -You could also use a `TOML `_ format for the configuration file. +You can also use a `TOML `_ format for the configuration file. Via Environment Properties __________________________ @@ -1044,7 +1046,7 @@ The second example shows how to use client capabilities to realtime visualizatio Other examples """""""""""""" -You could find all examples at GitHub: `influxdb-client-python/examples `_. +You can find all examples at GitHub: `influxdb-client-python/examples `_. .. marker-examples-end diff --git a/docs/api.rst b/docs/api.rst index bbf9d9a7..92a161a5 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -19,6 +19,12 @@ WriteApi .. autoclass:: influxdb_client.WriteApi :members: +.. autoclass:: influxdb_client.client.write.point.Point + :members: + +.. autoclass:: influxdb_client.domain.write_precision.WritePrecision + :members: + BucketsApi """""""""" .. autoclass:: influxdb_client.BucketsApi diff --git a/examples/README.md b/examples/README.md index 29efa6dd..354e6ae6 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,6 +7,7 @@ - [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame - [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/) - [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB +- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple) ## Queries - [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` diff --git a/examples/example.py b/examples/example.py index 73ed813a..0082ade1 100644 --- a/examples/example.py +++ b/examples/example.py @@ -37,4 +37,4 @@ print("val count: ", val_count) response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)') - print (codecs.decode(response.data)) + print(codecs.decode(response.data)) diff --git a/examples/write_structured_data.py b/examples/write_structured_data.py new file mode 100644 index 00000000..c5ca4050 --- /dev/null +++ b/examples/write_structured_data.py @@ -0,0 +1,47 @@ +from collections import namedtuple +from datetime import datetime + +from influxdb_client import InfluxDBClient +from influxdb_client.client.write_api import SYNCHRONOUS + + +class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 'temperature', 'timestamp'])): + """ + Sensor named structure + """ + pass + + +""" +Initialize client +""" +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + write_api = client.write_api(write_options=SYNCHRONOUS) + + """ + Sensor "current" value + """ + sensor = Sensor(name="sensor_pt859", + location="warehouse_125", + version="2021.06.05.5874", + pressure=125, + temperature=10, + timestamp=datetime.utcnow()) + print(sensor) + + """ + Synchronous write + """ + write_api.write(bucket="my-bucket", + record=sensor, + dictionary_measurement_key="name", + dictionary_time_key="timestamp", + dictionary_tag_keys=["location", "version"], + dictionary_field_keys=["pressure", "temperature"]) + + from influxdb_client import Point + point = Point("h2o_feet")\ + .tag("location", "coyote_creek")\ + .field("water_level", 4.0)\ + .time(4) + write_api.write("my-bucket", "my-org", point) diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index f20ce953..29523115 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -61,16 +61,78 @@ def measurement(measurement): return p @staticmethod - def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION): - """Initialize point from 'dict' structure.""" - point = Point(dictionary['measurement']) - if 'tags' in dictionary: + def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs): + """ + Initialize point from 'dict' structure. + + The expected dict structure is: + - measurement + - tags + - fields + - time + + Example: + .. code-block:: python + + # Use default dictionary structure + dict_structure = { + "measurement": "h2o_feet", + "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 1.0}, + "time": 1 + } + point = Point.from_dict(dict_structure, WritePrecision.NS) + + Example: + .. code-block:: python + + # Use custom dictionary structure + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "version": "2021.06.05.5874", + "pressure": 125, + "temperature": 10, + "created": 1632208639, + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + dictionary_measurement_key="name", + dictionary_time_key="created", + dictionary_tag_keys=["location", "version"], + dictionary_field_keys=["pressure", "temperature"]) + + :param dictionary: dictionary for serialize into Point structure + :param write_precision: sets the precision for the supplied time values + :key dictionary_measurement_key: key of dictionary with specified measurement + :key dictionary_time_key: key of dictionary with specified timestamp + :key dictionary_tag_keys: list of dictionary keys to use as a tag + :key dictionary_field_keys: list of dictionary keys to use as a field + :return: new data point + """ + point = Point(dictionary[kwargs.get('dictionary_measurement_key', 'measurement')]) + + dictionary_tag_keys = kwargs.get('dictionary_tag_keys', None) + if dictionary_tag_keys is not None: + for tag_key in dictionary_tag_keys: + if tag_key in dictionary: + point.tag(tag_key, dictionary[tag_key]) + elif 'tags' in dictionary: for tag_key, tag_value in dictionary['tags'].items(): point.tag(tag_key, tag_value) - for field_key, field_value in dictionary['fields'].items(): - point.field(field_key, field_value) - if 'time' in dictionary: - point.time(dictionary['time'], write_precision=write_precision) + + dictionary_field_keys = kwargs.get('dictionary_field_keys', None) + if dictionary_field_keys is not None: + for field_key in dictionary_field_keys: + if field_key in dictionary: + point.field(field_key, dictionary[field_key]) + else: + for field_key, field_value in dictionary['fields'].items(): + point.field(field_key, field_value) + + dictionary_time_key = kwargs.get('dictionary_time_key', 'time') + if dictionary_time_key in dictionary: + point.time(dictionary[dictionary_time_key], write_precision=write_precision) return point def __init__(self, measurement_name): diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 3380224c..813bc40e 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -173,7 +173,20 @@ def _body_reduce(batch_items): class WriteApi: - """Implementation for '/api/v2/write' endpoint.""" + """ + Implementation for '/api/v2/write' endpoint. + + Example: + .. code-block:: python + + from influxdb_client import InfluxDBClient + from influxdb_client.client.write_api import SYNCHRONOUS + + + # Initialize SYNCHRONOUS instance of WriteApi + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + write_api = client.write_api(write_options=SYNCHRONOUS) + """ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(), point_settings: PointSettings = PointSettings()) -> None: @@ -222,16 +235,41 @@ def write(self, bucket: str, org: str = None, """ Write time-series data into InfluxDB. + :param str bucket: specifies the destination bucket for writes (required) :param str, Organization org: specifies the destination organization for writes; take the ID, Name or Organization; if it's not specified then is used default from client.org. - :param str bucket: specifies the destination bucket for writes (required) :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write. - :param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write - :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields + :param record: Point, Line Protocol, Dictionary, Pandas DataFrame or RxPY Observable to write + :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame`` + :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - ``DataFrame`` + :key dictionary_measurement_key: key of dictionary with specified measurement - ``dictionary`` + :key dictionary_time_key: key of dictionary with specified timestamp - ``dictionary`` + :key dictionary_tag_keys: list of dictionary keys to use as a tag - ``dictionary`` + :key dictionary_field_keys: list of dictionary keys to use as a field - ``dictionary`` + + Example: + .. code-block:: python + + # Record as Line Protocol + write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") + + # Record as Dictionary + dictionary = { + "measurement": "h2o_feet", + "tags": {"location": "us-west"}, + "fields": {"level": 125}, + "time": 1 + } + write_api.write("my-bucket", "my-org", dictionary) + + # Record as Point + from influxdb_client import Point + point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) + write_api.write("my-bucket", "my-org", point) + """ org = get_org_query_param(org=org, client=self._influxdb_client) @@ -309,12 +347,14 @@ def _serialize(self, record, write_precision, payload, **kwargs): self._serialize(record.to_line_protocol(), record.write_precision, payload, **kwargs) elif isinstance(record, dict): - self._serialize(Point.from_dict(record, write_precision=write_precision), + self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs), write_precision, payload, **kwargs) elif 'DataFrame' in type(record).__name__: serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs) self._serialize(serializer.serialize(), write_precision, payload, **kwargs) - + elif hasattr(record, "_asdict"): + # noinspection PyProtectedMember + self._serialize(record._asdict(), write_precision, payload, **kwargs) elif isinstance(record, Iterable): for item in record: self._serialize(item, write_precision, payload, **kwargs) @@ -334,7 +374,7 @@ def _write_batching(self, bucket, org, data, self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs) elif isinstance(data, dict): - self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), + self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs), precision, **kwargs) elif 'DataFrame' in type(data).__name__: @@ -344,6 +384,9 @@ def _write_batching(self, bucket, org, data, self._write_batching(bucket, org, serializer.serialize(chunk_idx), precision, **kwargs) + elif hasattr(data, "_asdict"): + # noinspection PyProtectedMember + self._write_batching(bucket, org, data._asdict(), precision, **kwargs) elif isinstance(data, Iterable): for item in data: diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 301d6430..767ebc2d 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -5,6 +5,7 @@ import datetime import os import unittest +from collections import namedtuple from datetime import timedelta from multiprocessing.pool import ApplyResult @@ -539,6 +540,23 @@ def test_redirect(self): from urllib3 import Retry Retry.DEFAULT.remove_headers_on_redirect = Retry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT + def test_named_tuple(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + self.write_client = self.influxdb_client.write_api(write_options=SYNCHRONOUS) + + Factory = namedtuple('Factory', ['measurement', 'position', 'customers']) + factory = Factory(measurement='factory', position="central europe", customers=123456) + + self.write_client.write("my-bucket", "my-org", factory, + dictionary_measurement_key="measurement", + dictionary_tag_keys=["position"], + dictionary_field_keys=["customers"]) + + requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(requests)) + self.assertEqual("factory,position=central\\ europe customers=123456i", requests[0].parsed_body) + class AsynchronousWriteTest(BaseTest): diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index e7175543..38e70e7a 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -4,6 +4,7 @@ import time import unittest +from collections import namedtuple import httpretty import rx @@ -244,7 +245,7 @@ def test_retry_disabled_max_retries(self): self._write_client.close() self._write_client = WriteApi(influxdb_client=self.influxdb_client, - write_options=WriteOptions(max_retries=0,batch_size=2, flush_interval=1_000)) + write_options=WriteOptions(max_retries=0, batch_size=2, flush_interval=1_000)) self._write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek level\\ water_level=1 1", @@ -518,6 +519,28 @@ def test_batching_data_frame(self): self.assertEqual(_request1, _requests[0].parsed_body) self.assertEqual(_request2, _requests[1].parsed_body) + def test_named_tuple(self): + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=1)) + + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + Factory = namedtuple('Factory', ['measurement', 'position', 'customers']) + factory = Factory(measurement='factory', position="central europe", customers=123456) + + self._write_client.write("my-bucket", "my-org", factory, + dictionary_measurement_key="measurement", + dictionary_tag_keys=["position"], + dictionary_field_keys=["customers"]) + + time.sleep(1) + + _requests = httpretty.httpretty.latest_requests + + self.assertEqual(1, len(_requests)) + self.assertEqual("factory,position=central\\ europe customers=123456i", _requests[0].parsed_body) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_point.py b/tests/test_point.py index 8d2de231..e952408d 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -408,6 +408,70 @@ def test_numpy_types(self): self.assertEqual("h2o,location=europe np.float1=1.123,np.float2=2.123,np.float3=3.123,np.float4=4.123,np.int1=1i,np.int2=2i,np.int3=3i,np.int4=4i,np.uint1=5i,np.uint2=6i,np.uint3=7i,np.uint4=8i", point.to_line_protocol()) + def test_from_dictionary_custom_measurement(self): + dictionary = { + "name": "test", + "tags": {"tag": "a"}, + "fields": {"value": 1}, + "time": 1, + } + point = Point.from_dict(dictionary, dictionary_measurement_key="name") + self.assertEqual("test,tag=a value=1i 1", point.to_line_protocol()) + + def test_from_dictionary_custom_time(self): + dictionary = { + "name": "test", + "tags": {"tag": "a"}, + "fields": {"value": 1}, + "created": 100250, + } + point = Point.from_dict(dictionary, + dictionary_measurement_key="name", + dictionary_time_key="created") + self.assertEqual("test,tag=a value=1i 100250", point.to_line_protocol()) + + def test_from_dictionary_custom_tags(self): + dictionary = { + "name": "test", + "tag_a": "a", + "tag_b": "b", + "fields": {"value": 1}, + "time": 1, + } + point = Point.from_dict(dictionary, + dictionary_measurement_key="name", + dictionary_tag_keys=["tag_a", "tag_b"]) + self.assertEqual("test,tag_a=a,tag_b=b value=1i 1", point.to_line_protocol()) + + def test_from_dictionary_custom_fields(self): + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "version": "2021.06.05.5874", + "pressure": 125, + "temperature": 10, + "time": 1632208639, + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + dictionary_measurement_key="name", + dictionary_tag_keys=["location", "version"], + dictionary_field_keys=["pressure", "temperature"]) + self.assertEqual("sensor_pt859,location=warehouse_125,version=2021.06.05.5874 pressure=125i,temperature=10i 1632208639", point.to_line_protocol()) + + def test_from_dictionary_tolerant_to_missing_tags_and_fields(self): + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "pressure": 125 + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + dictionary_measurement_key="name", + dictionary_tag_keys=["location", "version"], + dictionary_field_keys=["pressure", "temperature"]) + self.assertEqual("sensor_pt859,location=warehouse_125 pressure=125i", point.to_line_protocol()) + if __name__ == '__main__': unittest.main() From 9d732c68b20ed556a90fdc1b04691e2230d4ea63 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Sep 2021 11:06:23 +0200 Subject: [PATCH 2/8] fix: doc style --- README.rst | 4 ++-- influxdb_client/client/write_api.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index 8ad96eae..10a2207d 100644 --- a/README.rst +++ b/README.rst @@ -364,7 +364,7 @@ The data could be written as 6. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item 7. `Pandas DataFrame `_ -You can find write examples at GitHub: `influxdb-client-python/examples `_. +You can find write examples at GitHub: `influxdb-client-python/examples `__. Batching """""""" @@ -1046,7 +1046,7 @@ The second example shows how to use client capabilities to realtime visualizatio Other examples """""""""""""" -You can find all examples at GitHub: `influxdb-client-python/examples `_. +You can find all examples at GitHub: `influxdb-client-python/examples `__. .. marker-examples-end diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 813bc40e..c0218536 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -244,7 +244,8 @@ def write(self, bucket: str, org: str = None, and is use for write. :param record: Point, Line Protocol, Dictionary, Pandas DataFrame or RxPY Observable to write :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame`` - :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - ``DataFrame`` + :key data_frame_tag_columns: list of DataFrame columns which are tags, + rest columns will be fields - ``DataFrame`` :key dictionary_measurement_key: key of dictionary with specified measurement - ``dictionary`` :key dictionary_time_key: key of dictionary with specified timestamp - ``dictionary`` :key dictionary_tag_keys: list of dictionary keys to use as a tag - ``dictionary`` From 5ec047776c5e9a126d80903edee1cc0e0db10d12 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Sep 2021 13:58:26 +0200 Subject: [PATCH 3/8] feat: add supports for write `dataclasses` --- .circleci/config.yml | 2 +- README.rst | 7 +++--- examples/README.md | 2 +- examples/write_structured_data.py | 37 ++++++++++++++++++++++------- influxdb_client/client/write_api.py | 33 +++++++++++++++++++------ tests/test_WriteApi.py | 27 +++++++++++++++++++++ tests/test_WriteApiBatching.py | 32 +++++++++++++++++++++++++ 7 files changed, 119 insertions(+), 21 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 56ddf7b5..561d3bb3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -108,7 +108,7 @@ jobs: pydocstyle --count influxdb_client check-examples: docker: - - image: *default-python + - image: "cimg/python:3.8" environment: PIPENV_VENV_IN_PROJECT: true - image: *default-influxdb diff --git a/README.rst b/README.rst index 10a2207d..05ca5436 100644 --- a/README.rst +++ b/README.rst @@ -360,9 +360,10 @@ The data could be written as 2. `Data Point `__ structure 3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` or custom structure 4. `NamedTuple `_ -5. List of above items -6. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item -7. `Pandas DataFrame `_ +5. `Data Classes `_ +6. `Pandas DataFrame `_ +7. List of above items +8. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item You can find write examples at GitHub: `influxdb-client-python/examples `__. diff --git a/examples/README.md b/examples/README.md index 354e6ae6..a4ed5210 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,7 +7,7 @@ - [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame - [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/) - [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB -- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple) +- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_) ## Queries - [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` diff --git a/examples/write_structured_data.py b/examples/write_structured_data.py index c5ca4050..85c44088 100644 --- a/examples/write_structured_data.py +++ b/examples/write_structured_data.py @@ -1,4 +1,5 @@ from collections import namedtuple +from dataclasses import dataclass from datetime import datetime from influxdb_client import InfluxDBClient @@ -7,19 +8,29 @@ class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 'temperature', 'timestamp'])): """ - Sensor named structure + Named structure - Sensor """ pass +@dataclass +class Car: + """ + DataClass structure - Car + """ + engine: str + type: str + speed: float + + """ Initialize client """ -with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client: write_api = client.write_api(write_options=SYNCHRONOUS) """ - Sensor "current" value + Sensor "current" state """ sensor = Sensor(name="sensor_pt859", location="warehouse_125", @@ -39,9 +50,17 @@ class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 't dictionary_tag_keys=["location", "version"], dictionary_field_keys=["pressure", "temperature"]) - from influxdb_client import Point - point = Point("h2o_feet")\ - .tag("location", "coyote_creek")\ - .field("water_level", 4.0)\ - .time(4) - write_api.write("my-bucket", "my-org", point) + """ + Car "current" speed + """ + car = Car('12V-BT', 'sport-cars', 125.25) + print(car) + + """ + Synchronous write + """ + write_api.write(bucket="my-bucket", + record=car, + dictionary_measurement_key="engine", + dictionary_tag_keys=["engine", "type"], + dictionary_field_keys=["speed"]) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index c0218536..ee717146 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -1,6 +1,7 @@ """Collect and write time series data to InfluxDB Cloud and InfluxDB OSS.""" # coding: utf-8 +import dataclasses import logging import os from collections import defaultdict @@ -8,7 +9,7 @@ from enum import Enum from random import random from time import sleep -from typing import Union, Any, Iterable +from typing import Union, Any, Iterable, NamedTuple import rx from rx import operators as ops, Observable @@ -24,6 +25,14 @@ logger = logging.getLogger(__name__) +try: + from dataclasses import dataclass + + _HAS_DATACLASS = True +except ModuleNotFoundError: + _HAS_DATACLASS = False + + class WriteType(Enum): """Configuration which type of writes will client use.""" @@ -230,7 +239,8 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() def write(self, bucket: str, org: str = None, record: Union[ str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], - Observable] = None, + Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass'] + ] = None, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any: """ Write time-series data into InfluxDB. @@ -242,14 +252,18 @@ def write(self, bucket: str, org: str = None, :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write. - :param record: Point, Line Protocol, Dictionary, Pandas DataFrame or RxPY Observable to write + :param record: Point, Line Protocol, Dictionary, NamedTuple, dataclass, Pandas DataFrame or RxPY Observable to write :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame`` :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - ``DataFrame`` - :key dictionary_measurement_key: key of dictionary with specified measurement - ``dictionary`` - :key dictionary_time_key: key of dictionary with specified timestamp - ``dictionary`` - :key dictionary_tag_keys: list of dictionary keys to use as a tag - ``dictionary`` - :key dictionary_field_keys: list of dictionary keys to use as a field - ``dictionary`` + :key dictionary_measurement_key: key of record with specified measurement - + ``dictionary``, ``NamedTuple``, ``dataclass`` + :key dictionary_time_key: key of record with specified timestamp - + ``dictionary``, ``NamedTuple``, ``dataclass`` + :key dictionary_tag_keys: list of record keys to use as a tag - + ``dictionary``, ``NamedTuple``, ``dataclass`` + :key dictionary_field_keys: list of record keys to use as a field - + ``dictionary``, ``NamedTuple``, ``dataclass`` Example: .. code-block:: python @@ -356,6 +370,8 @@ def _serialize(self, record, write_precision, payload, **kwargs): elif hasattr(record, "_asdict"): # noinspection PyProtectedMember self._serialize(record._asdict(), write_precision, payload, **kwargs) + elif _HAS_DATACLASS and dataclasses.is_dataclass(record): + self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs) elif isinstance(record, Iterable): for item in record: self._serialize(item, write_precision, payload, **kwargs) @@ -389,6 +405,9 @@ def _write_batching(self, bucket, org, data, # noinspection PyProtectedMember self._write_batching(bucket, org, data._asdict(), precision, **kwargs) + elif _HAS_DATACLASS and dataclasses.is_dataclass(data): + self._write_batching(bucket, org, dataclasses.asdict(data), precision, **kwargs) + elif isinstance(data, Iterable): for item in data: self._write_batching(bucket, org, item, precision, **kwargs) diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 767ebc2d..2e608dd7 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -4,12 +4,14 @@ import datetime import os +import sys import unittest from collections import namedtuple from datetime import timedelta from multiprocessing.pool import ApplyResult import httpretty +import pytest import influxdb_client from influxdb_client import Point, WritePrecision, InfluxDBClient @@ -557,6 +559,31 @@ def test_named_tuple(self): self.assertEqual(1, len(requests)) self.assertEqual("factory,position=central\\ europe customers=123456i", requests[0].parsed_body) + @pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher") + def test_data_class(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + self.write_client = self.influxdb_client.write_api(write_options=SYNCHRONOUS) + + from dataclasses import dataclass + + @dataclass + class Car: + engine: str + type: str + speed: float + + car = Car('12V-BT', 'sport-cars', 125.25) + self.write_client.write("my-bucket", "my-org", + record=car, + dictionary_measurement_key="engine", + dictionary_tag_keys=["engine", "type"], + dictionary_field_keys=["speed"]) + + requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(requests)) + self.assertEqual("12V-BT,engine=12V-BT,type=sport-cars speed=125.25", requests[0].parsed_body) + class AsynchronousWriteTest(BaseTest): diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 38e70e7a..9320ca9f 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -2,11 +2,13 @@ from __future__ import absolute_import +import sys import time import unittest from collections import namedtuple import httpretty +import pytest import rx from rx import operators as ops @@ -541,6 +543,36 @@ def test_named_tuple(self): self.assertEqual(1, len(_requests)) self.assertEqual("factory,position=central\\ europe customers=123456i", _requests[0].parsed_body) + @pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher") + def test_data_class(self): + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=1)) + + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + from dataclasses import dataclass + + @dataclass + class Car: + engine: str + type: str + speed: float + + car = Car('12V-BT', 'sport-cars', 125.25) + self._write_client.write("my-bucket", "my-org", + record=car, + dictionary_measurement_key="engine", + dictionary_tag_keys=["engine", "type"], + dictionary_field_keys=["speed"]) + + time.sleep(1) + + _requests = httpretty.httpretty.latest_requests + + self.assertEqual(1, len(_requests)) + self.assertEqual("12V-BT,engine=12V-BT,type=sport-cars speed=125.25", _requests[0].parsed_body) + if __name__ == '__main__': unittest.main() From 079465b06e6f9a6646d94ae4b3443c77c8a53d42 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Sep 2021 14:05:52 +0200 Subject: [PATCH 4/8] feat: add supports for write `dataclasses` --- examples/write_structured_data.py | 2 +- influxdb_client/client/write_api.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/write_structured_data.py b/examples/write_structured_data.py index 85c44088..1299f550 100644 --- a/examples/write_structured_data.py +++ b/examples/write_structured_data.py @@ -26,7 +26,7 @@ class Car: """ Initialize client """ -with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client: +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS) """ diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index ee717146..18f834f8 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -1,7 +1,6 @@ """Collect and write time series data to InfluxDB Cloud and InfluxDB OSS.""" # coding: utf-8 -import dataclasses import logging import os from collections import defaultdict @@ -26,7 +25,7 @@ try: - from dataclasses import dataclass + import dataclasses _HAS_DATACLASS = True except ModuleNotFoundError: @@ -252,7 +251,8 @@ def write(self, bucket: str, org: str = None, :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write. - :param record: Point, Line Protocol, Dictionary, NamedTuple, dataclass, Pandas DataFrame or RxPY Observable to write + :param record: Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame or + RxPY Observable to write :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame`` :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - ``DataFrame`` From 5f3de028bea7ecdc4237e2f59b2fdf0c1ab75926 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Sep 2021 14:09:20 +0200 Subject: [PATCH 5/8] feat: add supports for write `dataclasses` --- influxdb_client/client/write_api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 18f834f8..7504b1f4 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -26,6 +26,7 @@ try: import dataclasses + from dataclasses import dataclass _HAS_DATACLASS = True except ModuleNotFoundError: From ac6953e23287015caa2968f6c359b8645ad4b323 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Sep 2021 14:17:32 +0200 Subject: [PATCH 6/8] chore: rename kwargs to generic name --- examples/write_structured_data.py | 14 +++++------ influxdb_client/client/write/point.py | 36 +++++++++++++-------------- influxdb_client/client/write_api.py | 13 ++++------ tests/test_WriteApi.py | 12 ++++----- tests/test_WriteApiBatching.py | 12 ++++----- tests/test_point.py | 22 ++++++++-------- 6 files changed, 53 insertions(+), 56 deletions(-) diff --git a/examples/write_structured_data.py b/examples/write_structured_data.py index 1299f550..6829d1ae 100644 --- a/examples/write_structured_data.py +++ b/examples/write_structured_data.py @@ -45,10 +45,10 @@ class Car: """ write_api.write(bucket="my-bucket", record=sensor, - dictionary_measurement_key="name", - dictionary_time_key="timestamp", - dictionary_tag_keys=["location", "version"], - dictionary_field_keys=["pressure", "temperature"]) + record_measurement_key="name", + record_time_key="timestamp", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) """ Car "current" speed @@ -61,6 +61,6 @@ class Car: """ write_api.write(bucket="my-bucket", record=car, - dictionary_measurement_key="engine", - dictionary_tag_keys=["engine", "type"], - dictionary_field_keys=["speed"]) + record_measurement_key="engine", + record_tag_keys=["engine", "type"], + record_field_keys=["speed"]) diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index 29523115..45ea7484 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -97,42 +97,42 @@ def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_ } point = Point.from_dict(dictionary, write_precision=WritePrecision.S, - dictionary_measurement_key="name", - dictionary_time_key="created", - dictionary_tag_keys=["location", "version"], - dictionary_field_keys=["pressure", "temperature"]) + record_measurement_key="name", + record_time_key="created", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) :param dictionary: dictionary for serialize into Point structure :param write_precision: sets the precision for the supplied time values - :key dictionary_measurement_key: key of dictionary with specified measurement - :key dictionary_time_key: key of dictionary with specified timestamp - :key dictionary_tag_keys: list of dictionary keys to use as a tag - :key dictionary_field_keys: list of dictionary keys to use as a field + :key record_measurement_key: key of dictionary with specified measurement + :key record_time_key: key of dictionary with specified timestamp + :key record_tag_keys: list of dictionary keys to use as a tag + :key record_field_keys: list of dictionary keys to use as a field :return: new data point """ - point = Point(dictionary[kwargs.get('dictionary_measurement_key', 'measurement')]) + point = Point(dictionary[kwargs.get('record_measurement_key', 'measurement')]) - dictionary_tag_keys = kwargs.get('dictionary_tag_keys', None) - if dictionary_tag_keys is not None: - for tag_key in dictionary_tag_keys: + record_tag_keys = kwargs.get('record_tag_keys', None) + if record_tag_keys is not None: + for tag_key in record_tag_keys: if tag_key in dictionary: point.tag(tag_key, dictionary[tag_key]) elif 'tags' in dictionary: for tag_key, tag_value in dictionary['tags'].items(): point.tag(tag_key, tag_value) - dictionary_field_keys = kwargs.get('dictionary_field_keys', None) - if dictionary_field_keys is not None: - for field_key in dictionary_field_keys: + record_field_keys = kwargs.get('record_field_keys', None) + if record_field_keys is not None: + for field_key in record_field_keys: if field_key in dictionary: point.field(field_key, dictionary[field_key]) else: for field_key, field_value in dictionary['fields'].items(): point.field(field_key, field_value) - dictionary_time_key = kwargs.get('dictionary_time_key', 'time') - if dictionary_time_key in dictionary: - point.time(dictionary[dictionary_time_key], write_precision=write_precision) + record_time_key = kwargs.get('record_time_key', 'time') + if record_time_key in dictionary: + point.time(dictionary[record_time_key], write_precision=write_precision) return point def __init__(self, measurement_name): diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 7504b1f4..d8d702d6 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -257,14 +257,11 @@ def write(self, bucket: str, org: str = None, :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame`` :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields - ``DataFrame`` - :key dictionary_measurement_key: key of record with specified measurement - - ``dictionary``, ``NamedTuple``, ``dataclass`` - :key dictionary_time_key: key of record with specified timestamp - - ``dictionary``, ``NamedTuple``, ``dataclass`` - :key dictionary_tag_keys: list of record keys to use as a tag - - ``dictionary``, ``NamedTuple``, ``dataclass`` - :key dictionary_field_keys: list of record keys to use as a field - - ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_measurement_key: key of record with specified measurement - + ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_time_key: key of record with specified timestamp - ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_tag_keys: list of record keys to use as a tag - ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_field_keys: list of record keys to use as a field - ``dictionary``, ``NamedTuple``, ``dataclass`` Example: .. code-block:: python diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 2e608dd7..49f15aca 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -551,9 +551,9 @@ def test_named_tuple(self): factory = Factory(measurement='factory', position="central europe", customers=123456) self.write_client.write("my-bucket", "my-org", factory, - dictionary_measurement_key="measurement", - dictionary_tag_keys=["position"], - dictionary_field_keys=["customers"]) + record_measurement_key="measurement", + record_tag_keys=["position"], + record_field_keys=["customers"]) requests = httpretty.httpretty.latest_requests self.assertEqual(1, len(requests)) @@ -576,9 +576,9 @@ class Car: car = Car('12V-BT', 'sport-cars', 125.25) self.write_client.write("my-bucket", "my-org", record=car, - dictionary_measurement_key="engine", - dictionary_tag_keys=["engine", "type"], - dictionary_field_keys=["speed"]) + record_measurement_key="engine", + record_tag_keys=["engine", "type"], + record_field_keys=["speed"]) requests = httpretty.httpretty.latest_requests self.assertEqual(1, len(requests)) diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 9320ca9f..a03b5aa7 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -532,9 +532,9 @@ def test_named_tuple(self): factory = Factory(measurement='factory', position="central europe", customers=123456) self._write_client.write("my-bucket", "my-org", factory, - dictionary_measurement_key="measurement", - dictionary_tag_keys=["position"], - dictionary_field_keys=["customers"]) + record_measurement_key="measurement", + record_tag_keys=["position"], + record_field_keys=["customers"]) time.sleep(1) @@ -562,9 +562,9 @@ class Car: car = Car('12V-BT', 'sport-cars', 125.25) self._write_client.write("my-bucket", "my-org", record=car, - dictionary_measurement_key="engine", - dictionary_tag_keys=["engine", "type"], - dictionary_field_keys=["speed"]) + record_measurement_key="engine", + record_tag_keys=["engine", "type"], + record_field_keys=["speed"]) time.sleep(1) diff --git a/tests/test_point.py b/tests/test_point.py index e952408d..1437f3d0 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -415,7 +415,7 @@ def test_from_dictionary_custom_measurement(self): "fields": {"value": 1}, "time": 1, } - point = Point.from_dict(dictionary, dictionary_measurement_key="name") + point = Point.from_dict(dictionary, record_measurement_key="name") self.assertEqual("test,tag=a value=1i 1", point.to_line_protocol()) def test_from_dictionary_custom_time(self): @@ -426,8 +426,8 @@ def test_from_dictionary_custom_time(self): "created": 100250, } point = Point.from_dict(dictionary, - dictionary_measurement_key="name", - dictionary_time_key="created") + record_measurement_key="name", + record_time_key="created") self.assertEqual("test,tag=a value=1i 100250", point.to_line_protocol()) def test_from_dictionary_custom_tags(self): @@ -439,8 +439,8 @@ def test_from_dictionary_custom_tags(self): "time": 1, } point = Point.from_dict(dictionary, - dictionary_measurement_key="name", - dictionary_tag_keys=["tag_a", "tag_b"]) + record_measurement_key="name", + record_tag_keys=["tag_a", "tag_b"]) self.assertEqual("test,tag_a=a,tag_b=b value=1i 1", point.to_line_protocol()) def test_from_dictionary_custom_fields(self): @@ -454,9 +454,9 @@ def test_from_dictionary_custom_fields(self): } point = Point.from_dict(dictionary, write_precision=WritePrecision.S, - dictionary_measurement_key="name", - dictionary_tag_keys=["location", "version"], - dictionary_field_keys=["pressure", "temperature"]) + record_measurement_key="name", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) self.assertEqual("sensor_pt859,location=warehouse_125,version=2021.06.05.5874 pressure=125i,temperature=10i 1632208639", point.to_line_protocol()) def test_from_dictionary_tolerant_to_missing_tags_and_fields(self): @@ -467,9 +467,9 @@ def test_from_dictionary_tolerant_to_missing_tags_and_fields(self): } point = Point.from_dict(dictionary, write_precision=WritePrecision.S, - dictionary_measurement_key="name", - dictionary_tag_keys=["location", "version"], - dictionary_field_keys=["pressure", "temperature"]) + record_measurement_key="name", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) self.assertEqual("sensor_pt859,location=warehouse_125 pressure=125i", point.to_line_protocol()) From 1dba030e0eefea818b885a9e8a9814baa9f896fe Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Sep 2021 14:30:20 +0200 Subject: [PATCH 7/8] feat: add `record_measurement_name` for static measurement name --- examples/write_structured_data.py | 2 +- influxdb_client/client/write/point.py | 8 ++++++-- influxdb_client/client/write_api.py | 1 + tests/test_WriteApi.py | 4 ++-- tests/test_WriteApiBatching.py | 4 ++-- tests/test_point.py | 13 +++++++++++++ 6 files changed, 25 insertions(+), 7 deletions(-) diff --git a/examples/write_structured_data.py b/examples/write_structured_data.py index 6829d1ae..26a904f3 100644 --- a/examples/write_structured_data.py +++ b/examples/write_structured_data.py @@ -61,6 +61,6 @@ class Car: """ write_api.write(bucket="my-bucket", record=car, - record_measurement_key="engine", + record_measurement_name="performance", record_tag_keys=["engine", "type"], record_field_keys=["speed"]) diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index 45ea7484..c9fcbfbc 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -102,15 +102,19 @@ def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_ record_tag_keys=["location", "version"], record_field_keys=["pressure", "temperature"]) - :param dictionary: dictionary for serialize into Point structure + :param dictionary: dictionary for serialize into data Point :param write_precision: sets the precision for the supplied time values :key record_measurement_key: key of dictionary with specified measurement + :key record_measurement_name: static measurement name for data Point :key record_time_key: key of dictionary with specified timestamp :key record_tag_keys: list of dictionary keys to use as a tag :key record_field_keys: list of dictionary keys to use as a field :return: new data point """ - point = Point(dictionary[kwargs.get('record_measurement_key', 'measurement')]) + measurement_ = kwargs.get('record_measurement_name', None) + if measurement_ is None: + measurement_ = dictionary[kwargs.get('record_measurement_key', 'measurement')] + point = Point(measurement_) record_tag_keys = kwargs.get('record_tag_keys', None) if record_tag_keys is not None: diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index d8d702d6..c5157070 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -259,6 +259,7 @@ def write(self, bucket: str, org: str = None, rest columns will be fields - ``DataFrame`` :key record_measurement_key: key of record with specified measurement - ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass`` :key record_time_key: key of record with specified timestamp - ``dictionary``, ``NamedTuple``, ``dataclass`` :key record_tag_keys: list of record keys to use as a tag - ``dictionary``, ``NamedTuple``, ``dataclass`` :key record_field_keys: list of record keys to use as a field - ``dictionary``, ``NamedTuple``, ``dataclass`` diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 49f15aca..c077ad0b 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -576,13 +576,13 @@ class Car: car = Car('12V-BT', 'sport-cars', 125.25) self.write_client.write("my-bucket", "my-org", record=car, - record_measurement_key="engine", + record_measurement_name="performance", record_tag_keys=["engine", "type"], record_field_keys=["speed"]) requests = httpretty.httpretty.latest_requests self.assertEqual(1, len(requests)) - self.assertEqual("12V-BT,engine=12V-BT,type=sport-cars speed=125.25", requests[0].parsed_body) + self.assertEqual("performance,engine=12V-BT,type=sport-cars speed=125.25", requests[0].parsed_body) class AsynchronousWriteTest(BaseTest): diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index a03b5aa7..ec10e9cf 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -562,7 +562,7 @@ class Car: car = Car('12V-BT', 'sport-cars', 125.25) self._write_client.write("my-bucket", "my-org", record=car, - record_measurement_key="engine", + record_measurement_name="performance", record_tag_keys=["engine", "type"], record_field_keys=["speed"]) @@ -571,7 +571,7 @@ class Car: _requests = httpretty.httpretty.latest_requests self.assertEqual(1, len(_requests)) - self.assertEqual("12V-BT,engine=12V-BT,type=sport-cars speed=125.25", _requests[0].parsed_body) + self.assertEqual("performance,engine=12V-BT,type=sport-cars speed=125.25", _requests[0].parsed_body) if __name__ == '__main__': diff --git a/tests/test_point.py b/tests/test_point.py index 1437f3d0..53fd0cc5 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -472,6 +472,19 @@ def test_from_dictionary_tolerant_to_missing_tags_and_fields(self): record_field_keys=["pressure", "temperature"]) self.assertEqual("sensor_pt859,location=warehouse_125 pressure=125i", point.to_line_protocol()) + def test_static_measurement_name(self): + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "pressure": 125 + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + record_measurement_name="custom_sensor_id", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) + self.assertEqual("custom_sensor_id,location=warehouse_125 pressure=125i", point.to_line_protocol()) + if __name__ == '__main__': unittest.main() From 2405b2e5fe31e073d6544af50e5d3b98209c6986 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Sep 2021 14:33:55 +0200 Subject: [PATCH 8/8] docs: update CHANGELOG.md --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa7d7af6..c1e3811a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.22.0 [unreleased] +### Features +1. [#330](https://github.com/influxdata/influxdb-client-python/pull/330): Add supports for write structured data - `NamedTuple`, `Data Classes` + ## 1.21.0 [2021-09-17] ### Features