diff --git a/.circleci/config.yml b/.circleci/config.yml index 89c685e4..561d3bb3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -108,7 +108,7 @@ jobs: pydocstyle --count influxdb_client check-examples: docker: - - image: *default-python + - image: "cimg/python:3.8" environment: PIPENV_VENV_IN_PROJECT: true - image: *default-influxdb @@ -123,6 +123,7 @@ jobs: export PYTHONPATH="$PWD" python examples/monitoring_and_alerting.py python examples/buckets_management.py + python examples/write_structured_data.py check-sphinx: docker: - image: *default-python diff --git a/CHANGELOG.md b/CHANGELOG.md index 1eaaf3bc..60a44887 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.22.0 [unreleased] +### Features +1. [#330](https://github.com/influxdata/influxdb-client-python/pull/330): Add supports for write structured data - `NamedTuple`, `Data Classes` + ### Documentation 1. [#331](https://github.com/influxdata/influxdb-client-python/pull/331): Add [Migration Guide](MIGRATION_GUIDE.rst) diff --git a/README.rst b/README.rst index 2c805461..73580246 100644 --- a/README.rst +++ b/README.rst @@ -362,11 +362,14 @@ The data could be written as 1. ``string`` or ``bytes`` that is formatted as a InfluxDB's line protocol 2. `Data Point `__ structure -3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` -4. List of above items -5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item +3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` or custom structure +4. `NamedTuple `_ +5. `Data Classes `_ 6. `Pandas DataFrame `_ +7. List of above items +8. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item +You can find write examples at GitHub: `influxdb-client-python/examples `__. Batching """""""" @@ -532,7 +535,7 @@ In a `init `_ configuration customer = California Miner data_center = ${env.data_center} -You could also use a `TOML `_ format for the configuration file. +You can also use a `TOML `_ format for the configuration file. Via Environment Properties __________________________ @@ -1048,7 +1051,7 @@ The second example shows how to use client capabilities to realtime visualizatio Other examples """""""""""""" -You could find all examples at GitHub: `influxdb-client-python/examples `_. +You can find all examples at GitHub: `influxdb-client-python/examples `__. .. marker-examples-end diff --git a/docs/api.rst b/docs/api.rst index bbf9d9a7..92a161a5 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -19,6 +19,12 @@ WriteApi .. autoclass:: influxdb_client.WriteApi :members: +.. autoclass:: influxdb_client.client.write.point.Point + :members: + +.. autoclass:: influxdb_client.domain.write_precision.WritePrecision + :members: + BucketsApi """""""""" .. autoclass:: influxdb_client.BucketsApi diff --git a/examples/README.md b/examples/README.md index 29efa6dd..a4ed5210 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,6 +7,7 @@ - [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame - [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/) - [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB +- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_) ## Queries - [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` diff --git a/examples/example.py b/examples/example.py index 73ed813a..0082ade1 100644 --- a/examples/example.py +++ b/examples/example.py @@ -37,4 +37,4 @@ print("val count: ", val_count) response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)') - print (codecs.decode(response.data)) + print(codecs.decode(response.data)) diff --git a/examples/write_structured_data.py b/examples/write_structured_data.py new file mode 100644 index 00000000..26a904f3 --- /dev/null +++ b/examples/write_structured_data.py @@ -0,0 +1,66 @@ +from collections import namedtuple +from dataclasses import dataclass +from datetime import datetime + +from influxdb_client import InfluxDBClient +from influxdb_client.client.write_api import SYNCHRONOUS + + +class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 'temperature', 'timestamp'])): + """ + Named structure - Sensor + """ + pass + + +@dataclass +class Car: + """ + DataClass structure - Car + """ + engine: str + type: str + speed: float + + +""" +Initialize client +""" +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + write_api = client.write_api(write_options=SYNCHRONOUS) + + """ + Sensor "current" state + """ + sensor = Sensor(name="sensor_pt859", + location="warehouse_125", + version="2021.06.05.5874", + pressure=125, + temperature=10, + timestamp=datetime.utcnow()) + print(sensor) + + """ + Synchronous write + """ + write_api.write(bucket="my-bucket", + record=sensor, + record_measurement_key="name", + record_time_key="timestamp", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) + + """ + Car "current" speed + """ + car = Car('12V-BT', 'sport-cars', 125.25) + print(car) + + """ + Synchronous write + """ + write_api.write(bucket="my-bucket", + record=car, + record_measurement_name="performance", + record_tag_keys=["engine", "type"], + record_field_keys=["speed"]) diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index f20ce953..c9fcbfbc 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -61,16 +61,82 @@ def measurement(measurement): return p @staticmethod - def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION): - """Initialize point from 'dict' structure.""" - point = Point(dictionary['measurement']) - if 'tags' in dictionary: + def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs): + """ + Initialize point from 'dict' structure. + + The expected dict structure is: + - measurement + - tags + - fields + - time + + Example: + .. code-block:: python + + # Use default dictionary structure + dict_structure = { + "measurement": "h2o_feet", + "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 1.0}, + "time": 1 + } + point = Point.from_dict(dict_structure, WritePrecision.NS) + + Example: + .. code-block:: python + + # Use custom dictionary structure + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "version": "2021.06.05.5874", + "pressure": 125, + "temperature": 10, + "created": 1632208639, + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + record_measurement_key="name", + record_time_key="created", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) + + :param dictionary: dictionary for serialize into data Point + :param write_precision: sets the precision for the supplied time values + :key record_measurement_key: key of dictionary with specified measurement + :key record_measurement_name: static measurement name for data Point + :key record_time_key: key of dictionary with specified timestamp + :key record_tag_keys: list of dictionary keys to use as a tag + :key record_field_keys: list of dictionary keys to use as a field + :return: new data point + """ + measurement_ = kwargs.get('record_measurement_name', None) + if measurement_ is None: + measurement_ = dictionary[kwargs.get('record_measurement_key', 'measurement')] + point = Point(measurement_) + + record_tag_keys = kwargs.get('record_tag_keys', None) + if record_tag_keys is not None: + for tag_key in record_tag_keys: + if tag_key in dictionary: + point.tag(tag_key, dictionary[tag_key]) + elif 'tags' in dictionary: for tag_key, tag_value in dictionary['tags'].items(): point.tag(tag_key, tag_value) - for field_key, field_value in dictionary['fields'].items(): - point.field(field_key, field_value) - if 'time' in dictionary: - point.time(dictionary['time'], write_precision=write_precision) + + record_field_keys = kwargs.get('record_field_keys', None) + if record_field_keys is not None: + for field_key in record_field_keys: + if field_key in dictionary: + point.field(field_key, dictionary[field_key]) + else: + for field_key, field_value in dictionary['fields'].items(): + point.field(field_key, field_value) + + record_time_key = kwargs.get('record_time_key', 'time') + if record_time_key in dictionary: + point.time(dictionary[record_time_key], write_precision=write_precision) return point def __init__(self, measurement_name): diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 3380224c..c5157070 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -8,7 +8,7 @@ from enum import Enum from random import random from time import sleep -from typing import Union, Any, Iterable +from typing import Union, Any, Iterable, NamedTuple import rx from rx import operators as ops, Observable @@ -24,6 +24,15 @@ logger = logging.getLogger(__name__) +try: + import dataclasses + from dataclasses import dataclass + + _HAS_DATACLASS = True +except ModuleNotFoundError: + _HAS_DATACLASS = False + + class WriteType(Enum): """Configuration which type of writes will client use.""" @@ -173,7 +182,20 @@ def _body_reduce(batch_items): class WriteApi: - """Implementation for '/api/v2/write' endpoint.""" + """ + Implementation for '/api/v2/write' endpoint. + + Example: + .. code-block:: python + + from influxdb_client import InfluxDBClient + from influxdb_client.client.write_api import SYNCHRONOUS + + + # Initialize SYNCHRONOUS instance of WriteApi + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + write_api = client.write_api(write_options=SYNCHRONOUS) + """ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(), point_settings: PointSettings = PointSettings()) -> None: @@ -217,21 +239,51 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() def write(self, bucket: str, org: str = None, record: Union[ str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], - Observable] = None, + Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass'] + ] = None, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any: """ Write time-series data into InfluxDB. + :param str bucket: specifies the destination bucket for writes (required) :param str, Organization org: specifies the destination organization for writes; take the ID, Name or Organization; if it's not specified then is used default from client.org. - :param str bucket: specifies the destination bucket for writes (required) :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write. - :param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write - :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - :key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields + :param record: Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame or + RxPY Observable to write + :key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame`` + :key data_frame_tag_columns: list of DataFrame columns which are tags, + rest columns will be fields - ``DataFrame`` + :key record_measurement_key: key of record with specified measurement - + ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_time_key: key of record with specified timestamp - ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_tag_keys: list of record keys to use as a tag - ``dictionary``, ``NamedTuple``, ``dataclass`` + :key record_field_keys: list of record keys to use as a field - ``dictionary``, ``NamedTuple``, ``dataclass`` + + Example: + .. code-block:: python + + # Record as Line Protocol + write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") + + # Record as Dictionary + dictionary = { + "measurement": "h2o_feet", + "tags": {"location": "us-west"}, + "fields": {"level": 125}, + "time": 1 + } + write_api.write("my-bucket", "my-org", dictionary) + + # Record as Point + from influxdb_client import Point + point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) + write_api.write("my-bucket", "my-org", point) + """ org = get_org_query_param(org=org, client=self._influxdb_client) @@ -309,12 +361,16 @@ def _serialize(self, record, write_precision, payload, **kwargs): self._serialize(record.to_line_protocol(), record.write_precision, payload, **kwargs) elif isinstance(record, dict): - self._serialize(Point.from_dict(record, write_precision=write_precision), + self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs), write_precision, payload, **kwargs) elif 'DataFrame' in type(record).__name__: serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs) self._serialize(serializer.serialize(), write_precision, payload, **kwargs) - + elif hasattr(record, "_asdict"): + # noinspection PyProtectedMember + self._serialize(record._asdict(), write_precision, payload, **kwargs) + elif _HAS_DATACLASS and dataclasses.is_dataclass(record): + self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs) elif isinstance(record, Iterable): for item in record: self._serialize(item, write_precision, payload, **kwargs) @@ -334,7 +390,7 @@ def _write_batching(self, bucket, org, data, self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs) elif isinstance(data, dict): - self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), + self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs), precision, **kwargs) elif 'DataFrame' in type(data).__name__: @@ -344,6 +400,12 @@ def _write_batching(self, bucket, org, data, self._write_batching(bucket, org, serializer.serialize(chunk_idx), precision, **kwargs) + elif hasattr(data, "_asdict"): + # noinspection PyProtectedMember + self._write_batching(bucket, org, data._asdict(), precision, **kwargs) + + elif _HAS_DATACLASS and dataclasses.is_dataclass(data): + self._write_batching(bucket, org, dataclasses.asdict(data), precision, **kwargs) elif isinstance(data, Iterable): for item in data: diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 757e4845..d715fe83 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -4,11 +4,14 @@ import datetime import os +import sys import unittest +from collections import namedtuple from datetime import timedelta from multiprocessing.pool import ApplyResult import httpretty +import pytest import influxdb_client from influxdb_client import Point, WritePrecision, InfluxDBClient @@ -542,6 +545,48 @@ def test_redirect(self): from urllib3 import Retry Retry.DEFAULT.remove_headers_on_redirect = Retry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT + def test_named_tuple(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + self.write_client = self.influxdb_client.write_api(write_options=SYNCHRONOUS) + + Factory = namedtuple('Factory', ['measurement', 'position', 'customers']) + factory = Factory(measurement='factory', position="central europe", customers=123456) + + self.write_client.write("my-bucket", "my-org", factory, + record_measurement_key="measurement", + record_tag_keys=["position"], + record_field_keys=["customers"]) + + requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(requests)) + self.assertEqual("factory,position=central\\ europe customers=123456i", requests[0].parsed_body) + + @pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher") + def test_data_class(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + self.write_client = self.influxdb_client.write_api(write_options=SYNCHRONOUS) + + from dataclasses import dataclass + + @dataclass + class Car: + engine: str + type: str + speed: float + + car = Car('12V-BT', 'sport-cars', 125.25) + self.write_client.write("my-bucket", "my-org", + record=car, + record_measurement_name="performance", + record_tag_keys=["engine", "type"], + record_field_keys=["speed"]) + + requests = httpretty.httpretty.latest_requests + self.assertEqual(1, len(requests)) + self.assertEqual("performance,engine=12V-BT,type=sport-cars speed=125.25", requests[0].parsed_body) + class AsynchronousWriteTest(BaseTest): diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index e7175543..ec10e9cf 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -2,10 +2,13 @@ from __future__ import absolute_import +import sys import time import unittest +from collections import namedtuple import httpretty +import pytest import rx from rx import operators as ops @@ -244,7 +247,7 @@ def test_retry_disabled_max_retries(self): self._write_client.close() self._write_client = WriteApi(influxdb_client=self.influxdb_client, - write_options=WriteOptions(max_retries=0,batch_size=2, flush_interval=1_000)) + write_options=WriteOptions(max_retries=0, batch_size=2, flush_interval=1_000)) self._write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek level\\ water_level=1 1", @@ -518,6 +521,58 @@ def test_batching_data_frame(self): self.assertEqual(_request1, _requests[0].parsed_body) self.assertEqual(_request2, _requests[1].parsed_body) + def test_named_tuple(self): + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=1)) + + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + Factory = namedtuple('Factory', ['measurement', 'position', 'customers']) + factory = Factory(measurement='factory', position="central europe", customers=123456) + + self._write_client.write("my-bucket", "my-org", factory, + record_measurement_key="measurement", + record_tag_keys=["position"], + record_field_keys=["customers"]) + + time.sleep(1) + + _requests = httpretty.httpretty.latest_requests + + self.assertEqual(1, len(_requests)) + self.assertEqual("factory,position=central\\ europe customers=123456i", _requests[0].parsed_body) + + @pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher") + def test_data_class(self): + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=1)) + + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + from dataclasses import dataclass + + @dataclass + class Car: + engine: str + type: str + speed: float + + car = Car('12V-BT', 'sport-cars', 125.25) + self._write_client.write("my-bucket", "my-org", + record=car, + record_measurement_name="performance", + record_tag_keys=["engine", "type"], + record_field_keys=["speed"]) + + time.sleep(1) + + _requests = httpretty.httpretty.latest_requests + + self.assertEqual(1, len(_requests)) + self.assertEqual("performance,engine=12V-BT,type=sport-cars speed=125.25", _requests[0].parsed_body) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_point.py b/tests/test_point.py index 8d2de231..53fd0cc5 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -408,6 +408,83 @@ def test_numpy_types(self): self.assertEqual("h2o,location=europe np.float1=1.123,np.float2=2.123,np.float3=3.123,np.float4=4.123,np.int1=1i,np.int2=2i,np.int3=3i,np.int4=4i,np.uint1=5i,np.uint2=6i,np.uint3=7i,np.uint4=8i", point.to_line_protocol()) + def test_from_dictionary_custom_measurement(self): + dictionary = { + "name": "test", + "tags": {"tag": "a"}, + "fields": {"value": 1}, + "time": 1, + } + point = Point.from_dict(dictionary, record_measurement_key="name") + self.assertEqual("test,tag=a value=1i 1", point.to_line_protocol()) + + def test_from_dictionary_custom_time(self): + dictionary = { + "name": "test", + "tags": {"tag": "a"}, + "fields": {"value": 1}, + "created": 100250, + } + point = Point.from_dict(dictionary, + record_measurement_key="name", + record_time_key="created") + self.assertEqual("test,tag=a value=1i 100250", point.to_line_protocol()) + + def test_from_dictionary_custom_tags(self): + dictionary = { + "name": "test", + "tag_a": "a", + "tag_b": "b", + "fields": {"value": 1}, + "time": 1, + } + point = Point.from_dict(dictionary, + record_measurement_key="name", + record_tag_keys=["tag_a", "tag_b"]) + self.assertEqual("test,tag_a=a,tag_b=b value=1i 1", point.to_line_protocol()) + + def test_from_dictionary_custom_fields(self): + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "version": "2021.06.05.5874", + "pressure": 125, + "temperature": 10, + "time": 1632208639, + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + record_measurement_key="name", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) + self.assertEqual("sensor_pt859,location=warehouse_125,version=2021.06.05.5874 pressure=125i,temperature=10i 1632208639", point.to_line_protocol()) + + def test_from_dictionary_tolerant_to_missing_tags_and_fields(self): + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "pressure": 125 + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + record_measurement_key="name", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) + self.assertEqual("sensor_pt859,location=warehouse_125 pressure=125i", point.to_line_protocol()) + + def test_static_measurement_name(self): + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "pressure": 125 + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + record_measurement_name="custom_sensor_id", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) + self.assertEqual("custom_sensor_id,location=warehouse_125 pressure=125i", point.to_line_protocol()) + if __name__ == '__main__': unittest.main()