diff --git a/Examples/batching-example.py b/Examples/batching-example.py new file mode 100644 index 0000000..31ab004 --- /dev/null +++ b/Examples/batching-example.py @@ -0,0 +1,66 @@ +import random +import pymongo +import pandas as pd +from bson import ObjectId +import influxdb_client_3 as InfluxDBClient3 +import pandas as pd +import numpy as np +from influxdb_client_3 import write_options, WritePrecision +import datetime +import time + + +# Creating 5.000 gatewayId values as MongoDB ObjectIDs +gatewayIds = [ObjectId() for x in range(0,100)] + +# Setting decimal precision to 2 +precision = 2 + +# Setting timestamp for first sensor reading +now = datetime.datetime.now() +now = now - datetime.timedelta(days=366) +teststart = datetime.datetime.now() + +# InfluxDB connection details +token = "" +org = "" +bucket = "" +url = "" + +# Opening InfluxDB client with a batch size of 5k points or flush interval of 10k ms and gzip compression +with InfluxDBClient3.InfluxDBClient3(token=token, + host=url, + org=org, + database="solarmanager", enable_gzip=True, write_options=write_options(batch_size=5_000, + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2, write_type='batching')) as _client: + + # Creating iterator for one hour worth of data (6 sensor readings per minute) + for i in range(0,525600): + # Adding 10 seconds to timestamp of previous sensor reading + now = now + datetime.timedelta(seconds = 10) + # Iterating over gateways + for gatewayId in gatewayIds: + # Creating random test data for 12 fields to be stored in timeseries database + bcW = random.randrange(1501) + bcWh = round(random.uniform(0,4.17), precision) + bdW = random.randrange(71) + bdWh = round(random.uniform(0,0.12), precision) + cPvWh = round(random.uniform(0.51,27.78), precision) + cW = random.randrange(172,10001) + cWh = round(random.uniform(0.51,27.78), precision) + eWh = round(random.uniform(0,41.67), precision) + iWh = round(random.uniform(0,16.67), precision) + pW = random.randrange(209,20001) + pWh = round(random.uniform(0.58,55.56), precision) + scWh = round(random.uniform(0.58,55.56), precision) + # Creating point to be ingested into InfluxDB + p = InfluxDBClient3.Point("stream").tag("gatewayId", str(gatewayId)).field("bcW", bcW).field("bcWh", bcWh).field("bdW", bdW).field("bdWh", bdWh).field("cPvWh", cPvWh).field("cW", cW).field("cWh", cWh).field("eWh", eWh).field("iWh", iWh).field("pW", pW).field("pWh", pWh).field("scWh", scWh).time(now.strftime('%Y-%m-%dT%H:%M:%SZ'), WritePrecision.S) + + # Writing point (InfluxDB automatically batches writes into sets of 5k points) + _client.write(record=p) + diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index a5676cf..8c8cb3d 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -6,6 +6,8 @@ from influxdb_client import WriteOptions as _WriteOptions from influxdb_client.client.write_api import WriteApi as _WriteApi from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS +from influxdb_client.client.write_api import PointSettings +from influxdb_client.domain.write_precision import WritePrecision from influxdb_client import Point import json @@ -14,6 +16,7 @@ def write_options(**kwargs): + class InfluxDBClient3: def __init__(self, host=None, org=None, database=None, token=None, write_options=None, **kwargs): """ @@ -27,15 +30,9 @@ def __init__(self, host=None, org=None, database=None, token=None, write_options """ self._org = org self._database = database - self.write_options = write_options - - if self.write_options == None: - self.write_options = SYNCHRONOUS - - + self.write_options = write_options if write_options is not None else SYNCHRONOUS self._client = _InfluxDBClient(url=f"https://{host}", token=token, org=self._org, **kwargs ) self._write_api = _WriteApi(self._client, write_options=self.write_options ) - self._flight_client = FlightClient(f"grpc+tls://{host}:443") # create an authorization header self._options = FlightCallOptions(headers=[(b"authorization",f"Bearer {token}".encode('utf-8'))]) @@ -66,7 +63,6 @@ def write_csv(self, csv_file, measurement_name=None, tag_columns = [],timestamp_ atable = csv.read_csv(csv_file, **kwargs) df = atable.to_pandas() - print(df) self._write_api.write(bucket=self._database, record=df, data_frame_measurement_name=measurement_name, data_frame_tag_columns=tag_columns, @@ -96,8 +92,21 @@ def query(self, query, language="sql"): # which is useful if you have huge data sets return flight_reader.read_all() - def __del__(self): - self._write_api.__del__() - return self._client.__del__() -__all__ = ["InfluxDBClient3", "Point"] + def close(self): + # Clean up resources here. + # Call close method of _write_api and _flight_client, if they exist. + if hasattr(self._write_api, 'close'): + self._write_api.close() + if hasattr(self._flight_client, 'close'): + self._flight_client.close() + if hasattr(self._client, 'close'): + self._client.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + +__all__ = ["InfluxDBClient3", "Point", "PointSettings", "SYNCHRONOUS", "ASYNCHRONOUS", "write_options", "WritePrecision"]