Skip to content

exposed write api functions. added batching sample #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions Examples/batching-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import random
import pymongo
import pandas as pd
from bson import ObjectId
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_options, WritePrecision
import datetime
import time


# Creating 5.000 gatewayId values as MongoDB ObjectIDs
gatewayIds = [ObjectId() for x in range(0,100)]

# Setting decimal precision to 2
precision = 2

# Setting timestamp for first sensor reading
now = datetime.datetime.now()
now = now - datetime.timedelta(days=366)
teststart = datetime.datetime.now()

# InfluxDB connection details
token = ""
org = ""
bucket = ""
url = ""

# Opening InfluxDB client with a batch size of 5k points or flush interval of 10k ms and gzip compression
with InfluxDBClient3.InfluxDBClient3(token=token,
host=url,
org=org,
database="solarmanager", enable_gzip=True, write_options=write_options(batch_size=5_000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2, write_type='batching')) as _client:

# Creating iterator for one hour worth of data (6 sensor readings per minute)
for i in range(0,525600):
# Adding 10 seconds to timestamp of previous sensor reading
now = now + datetime.timedelta(seconds = 10)
# Iterating over gateways
for gatewayId in gatewayIds:
# Creating random test data for 12 fields to be stored in timeseries database
bcW = random.randrange(1501)
bcWh = round(random.uniform(0,4.17), precision)
bdW = random.randrange(71)
bdWh = round(random.uniform(0,0.12), precision)
cPvWh = round(random.uniform(0.51,27.78), precision)
cW = random.randrange(172,10001)
cWh = round(random.uniform(0.51,27.78), precision)
eWh = round(random.uniform(0,41.67), precision)
iWh = round(random.uniform(0,16.67), precision)
pW = random.randrange(209,20001)
pWh = round(random.uniform(0.58,55.56), precision)
scWh = round(random.uniform(0.58,55.56), precision)
# Creating point to be ingested into InfluxDB
p = InfluxDBClient3.Point("stream").tag("gatewayId", str(gatewayId)).field("bcW", bcW).field("bcWh", bcWh).field("bdW", bdW).field("bdWh", bdWh).field("cPvWh", cPvWh).field("cW", cW).field("cWh", cWh).field("eWh", eWh).field("iWh", iWh).field("pW", pW).field("pWh", pWh).field("scWh", scWh).time(now.strftime('%Y-%m-%dT%H:%M:%SZ'), WritePrecision.S)

# Writing point (InfluxDB automatically batches writes into sets of 5k points)
_client.write(record=p)

33 changes: 21 additions & 12 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from influxdb_client import WriteOptions as _WriteOptions
from influxdb_client.client.write_api import WriteApi as _WriteApi
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
from influxdb_client.client.write_api import PointSettings
from influxdb_client.domain.write_precision import WritePrecision
from influxdb_client import Point
import json

Expand All @@ -14,6 +16,7 @@ def write_options(**kwargs):




class InfluxDBClient3:
def __init__(self, host=None, org=None, database=None, token=None, write_options=None, **kwargs):
"""
Expand All @@ -27,15 +30,9 @@ def __init__(self, host=None, org=None, database=None, token=None, write_options
"""
self._org = org
self._database = database
self.write_options = write_options

if self.write_options == None:
self.write_options = SYNCHRONOUS


self.write_options = write_options if write_options is not None else SYNCHRONOUS
self._client = _InfluxDBClient(url=f"https://{host}", token=token, org=self._org, **kwargs )
self._write_api = _WriteApi(self._client, write_options=self.write_options )

self._flight_client = FlightClient(f"grpc+tls://{host}:443")
# create an authorization header
self._options = FlightCallOptions(headers=[(b"authorization",f"Bearer {token}".encode('utf-8'))])
Expand Down Expand Up @@ -66,7 +63,6 @@ def write_csv(self, csv_file, measurement_name=None, tag_columns = [],timestamp_
atable = csv.read_csv(csv_file, **kwargs)

df = atable.to_pandas()
print(df)
self._write_api.write(bucket=self._database, record=df,
data_frame_measurement_name=measurement_name,
data_frame_tag_columns=tag_columns,
Expand Down Expand Up @@ -96,8 +92,21 @@ def query(self, query, language="sql"):
# which is useful if you have huge data sets
return flight_reader.read_all()

def __del__(self):
self._write_api.__del__()
return self._client.__del__()

__all__ = ["InfluxDBClient3", "Point"]
def close(self):
# Clean up resources here.
# Call close method of _write_api and _flight_client, if they exist.
if hasattr(self._write_api, 'close'):
self._write_api.close()
if hasattr(self._flight_client, 'close'):
self._flight_client.close()
if hasattr(self._client, 'close'):
self._client.close()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

__all__ = ["InfluxDBClient3", "Point", "PointSettings", "SYNCHRONOUS", "ASYNCHRONOUS", "write_options", "WritePrecision"]