Skip to content

Commit fb50c6d

Browse files
Merge pull request #20 from InfluxCommunity/19-expose-write-precision-and-other-write-api-functionality
exposed write api functions. added batching sample
2 parents 7cbe32b + 18c06cd commit fb50c6d

File tree

2 files changed

+87
-12
lines changed

2 files changed

+87
-12
lines changed

Examples/batching-example.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import random
2+
import pymongo
3+
import pandas as pd
4+
from bson import ObjectId
5+
import influxdb_client_3 as InfluxDBClient3
6+
import pandas as pd
7+
import numpy as np
8+
from influxdb_client_3 import write_options, WritePrecision
9+
import datetime
10+
import time
11+
12+
13+
# Creating 5.000 gatewayId values as MongoDB ObjectIDs
14+
gatewayIds = [ObjectId() for x in range(0,100)]
15+
16+
# Setting decimal precision to 2
17+
precision = 2
18+
19+
# Setting timestamp for first sensor reading
20+
now = datetime.datetime.now()
21+
now = now - datetime.timedelta(days=366)
22+
teststart = datetime.datetime.now()
23+
24+
# InfluxDB connection details
25+
token = ""
26+
org = ""
27+
bucket = ""
28+
url = ""
29+
30+
# Opening InfluxDB client with a batch size of 5k points or flush interval of 10k ms and gzip compression
31+
with InfluxDBClient3.InfluxDBClient3(token=token,
32+
host=url,
33+
org=org,
34+
database="solarmanager", enable_gzip=True, write_options=write_options(batch_size=5_000,
35+
flush_interval=10_000,
36+
jitter_interval=2_000,
37+
retry_interval=5_000,
38+
max_retries=5,
39+
max_retry_delay=30_000,
40+
exponential_base=2, write_type='batching')) as _client:
41+
42+
# Creating iterator for one hour worth of data (6 sensor readings per minute)
43+
for i in range(0,525600):
44+
# Adding 10 seconds to timestamp of previous sensor reading
45+
now = now + datetime.timedelta(seconds = 10)
46+
# Iterating over gateways
47+
for gatewayId in gatewayIds:
48+
# Creating random test data for 12 fields to be stored in timeseries database
49+
bcW = random.randrange(1501)
50+
bcWh = round(random.uniform(0,4.17), precision)
51+
bdW = random.randrange(71)
52+
bdWh = round(random.uniform(0,0.12), precision)
53+
cPvWh = round(random.uniform(0.51,27.78), precision)
54+
cW = random.randrange(172,10001)
55+
cWh = round(random.uniform(0.51,27.78), precision)
56+
eWh = round(random.uniform(0,41.67), precision)
57+
iWh = round(random.uniform(0,16.67), precision)
58+
pW = random.randrange(209,20001)
59+
pWh = round(random.uniform(0.58,55.56), precision)
60+
scWh = round(random.uniform(0.58,55.56), precision)
61+
# Creating point to be ingested into InfluxDB
62+
p = InfluxDBClient3.Point("stream").tag("gatewayId", str(gatewayId)).field("bcW", bcW).field("bcWh", bcWh).field("bdW", bdW).field("bdWh", bdWh).field("cPvWh", cPvWh).field("cW", cW).field("cWh", cWh).field("eWh", eWh).field("iWh", iWh).field("pW", pW).field("pWh", pWh).field("scWh", scWh).time(now.strftime('%Y-%m-%dT%H:%M:%SZ'), WritePrecision.S)
63+
64+
# Writing point (InfluxDB automatically batches writes into sets of 5k points)
65+
_client.write(record=p)
66+

influxdb_client_3/__init__.py

+21-12
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from influxdb_client import WriteOptions as _WriteOptions
77
from influxdb_client.client.write_api import WriteApi as _WriteApi
88
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
9+
from influxdb_client.client.write_api import PointSettings
10+
from influxdb_client.domain.write_precision import WritePrecision
911
from influxdb_client import Point
1012
import json
1113

@@ -14,6 +16,7 @@ def write_options(**kwargs):
1416

1517

1618

19+
1720
class InfluxDBClient3:
1821
def __init__(self, host=None, org=None, database=None, token=None, write_options=None, **kwargs):
1922
"""
@@ -27,15 +30,9 @@ def __init__(self, host=None, org=None, database=None, token=None, write_options
2730
"""
2831
self._org = org
2932
self._database = database
30-
self.write_options = write_options
31-
32-
if self.write_options == None:
33-
self.write_options = SYNCHRONOUS
34-
35-
33+
self.write_options = write_options if write_options is not None else SYNCHRONOUS
3634
self._client = _InfluxDBClient(url=f"https://{host}", token=token, org=self._org, **kwargs )
3735
self._write_api = _WriteApi(self._client, write_options=self.write_options )
38-
3936
self._flight_client = FlightClient(f"grpc+tls://{host}:443")
4037
# create an authorization header
4138
self._options = FlightCallOptions(headers=[(b"authorization",f"Bearer {token}".encode('utf-8'))])
@@ -66,7 +63,6 @@ def write_csv(self, csv_file, measurement_name=None, tag_columns = [],timestamp_
6663
atable = csv.read_csv(csv_file, **kwargs)
6764

6865
df = atable.to_pandas()
69-
print(df)
7066
self._write_api.write(bucket=self._database, record=df,
7167
data_frame_measurement_name=measurement_name,
7268
data_frame_tag_columns=tag_columns,
@@ -96,8 +92,21 @@ def query(self, query, language="sql"):
9692
# which is useful if you have huge data sets
9793
return flight_reader.read_all()
9894

99-
def __del__(self):
100-
self._write_api.__del__()
101-
return self._client.__del__()
10295

103-
__all__ = ["InfluxDBClient3", "Point"]
96+
def close(self):
97+
# Clean up resources here.
98+
# Call close method of _write_api and _flight_client, if they exist.
99+
if hasattr(self._write_api, 'close'):
100+
self._write_api.close()
101+
if hasattr(self._flight_client, 'close'):
102+
self._flight_client.close()
103+
if hasattr(self._client, 'close'):
104+
self._client.close()
105+
106+
def __enter__(self):
107+
return self
108+
109+
def __exit__(self, exc_type, exc_val, exc_tb):
110+
self.close()
111+
112+
__all__ = ["InfluxDBClient3", "Point", "PointSettings", "SYNCHRONOUS", "ASYNCHRONOUS", "write_options", "WritePrecision"]

0 commit comments

Comments
 (0)