Skip to content

Commit 02ddf0e

Browse files
committed
Add time_precision support to line protocol.
1 parent e34acc1 commit 02ddf0e

File tree

5 files changed

+80
-43
lines changed

5 files changed

+80
-43
lines changed

Diff for: influxdb/_dataframe_client.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,14 @@ def write_points(self, dataframe, measurement, tags=None,
5555
end_index = (batch + 1) * batch_size
5656
points = self._convert_dataframe_to_json(
5757
dataframe.ix[start_index:end_index].copy(),
58-
measurement,
59-
tags
58+
measurement, tags, time_precision
6059
)
6160
super(DataFrameClient, self).write_points(
6261
points, time_precision, database, retention_policy)
6362
return True
6463
else:
6564
points = self._convert_dataframe_to_json(
66-
dataframe, measurement, tags
65+
dataframe, measurement, tags, time_precision
6766
)
6867
super(DataFrameClient, self).write_points(
6968
points, time_precision, database, retention_policy)
@@ -116,7 +115,8 @@ def _to_dataframe(self, rs):
116115
result[key] = df
117116
return result
118117

119-
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
118+
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
119+
time_precision=None):
120120

121121
if not isinstance(dataframe, pd.DataFrame):
122122
raise TypeError('Must be DataFrame, but type was: {}.'
@@ -136,11 +136,18 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
136136
# Convert dtype for json serialization
137137
dataframe = dataframe.astype('object')
138138

139+
precision_factor = {
140+
"n": 1,
141+
"u": 1e3,
142+
"ms": 1e6,
143+
"s": 1e9
144+
}.get(time_precision, 1)
145+
139146
points = [
140147
{'measurement': measurement,
141148
'tags': tags if tags else {},
142149
'fields': rec,
143-
'time': ts.value
150+
'time': int(ts.value / precision_factor)
144151
}
145152
for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))]
146153
return points

Diff for: influxdb/client.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,16 @@ def write(self, data, params=None, expected_response_code=204):
259259
headers = self._headers
260260
headers['Content-type'] = 'application/octet-stream'
261261

262+
if params:
263+
precision = params.get('precision')
264+
else:
265+
precision = None
266+
262267
self.request(
263268
url="write",
264269
method='POST',
265270
params=params,
266-
data=make_lines(data).encode('utf-8'),
271+
data=make_lines(data, precision).encode('utf-8'),
267272
expected_response_code=expected_response_code,
268273
headers=headers
269274
)

Diff for: influxdb/line_protocol.py

+15-10
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,28 @@
66
from datetime import datetime
77

88
from dateutil.parser import parse
9-
from pytz import utc
109
from six import binary_type, text_type
1110

1211

13-
def _convert_timestamp(timestamp):
12+
def _convert_timestamp(timestamp, precision=None):
1413
if isinstance(timestamp, int):
15-
return timestamp
14+
return timestamp # assume precision is correct if timestamp is int
1615
if isinstance(_force_text(timestamp), text_type):
1716
timestamp = parse(timestamp)
1817
if isinstance(timestamp, datetime):
19-
if timestamp.tzinfo:
20-
timestamp = timestamp.astimezone(utc)
21-
timestamp.replace(tzinfo=None)
22-
return (
23-
timegm(timestamp.timetuple()) * 1e9 +
18+
ns = (
19+
timegm(timestamp.utctimetuple()) * 1e9 +
2420
timestamp.microsecond * 1e3
2521
)
22+
if precision is None or precision == 'n':
23+
return ns
24+
elif precision == 'u':
25+
return ns / 1e3
26+
elif precision == 'ms':
27+
return ns / 1e6
28+
elif precision == 's':
29+
return ns / 1e9
30+
2631
raise ValueError(timestamp)
2732

2833

@@ -58,7 +63,7 @@ def _force_text(data):
5863
return data
5964

6065

61-
def make_lines(data):
66+
def make_lines(data, precision=None):
6267
"""
6368
Extracts the points from the given dict and returns a Unicode string
6469
matching the line protocol introduced in InfluxDB 0.9.0.
@@ -103,7 +108,7 @@ def make_lines(data):
103108
# add timestamp
104109
if 'time' in point:
105110
timestamp = _force_text(str(int(
106-
_convert_timestamp(point['time'])
111+
_convert_timestamp(point['time'], precision)
107112
)))
108113
elements.append(timestamp)
109114

Diff for: influxdb/tests/client_test.py

+22-4
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,35 @@ def test_write_points_with_precision(self):
275275
)
276276

277277
cli = InfluxDBClient(database='db')
278-
cli.write_points(
279-
self.dummy_points,
280-
time_precision='n'
281-
)
282278

279+
cli.write_points(self.dummy_points, time_precision='n')
283280
self.assertEqual(
284281
b"cpu_load_short,host=server01,region=us-west "
285282
b"value=0.64 1257894000000000000\n",
286283
m.last_request.body,
287284
)
288285

286+
cli.write_points(self.dummy_points, time_precision='u')
287+
self.assertEqual(
288+
b"cpu_load_short,host=server01,region=us-west "
289+
b"value=0.64 1257894000000000\n",
290+
m.last_request.body,
291+
)
292+
293+
cli.write_points(self.dummy_points, time_precision='ms')
294+
self.assertEqual(
295+
b"cpu_load_short,host=server01,region=us-west "
296+
b"value=0.64 1257894000000\n",
297+
m.last_request.body,
298+
)
299+
300+
cli.write_points(self.dummy_points, time_precision='s')
301+
self.assertEqual(
302+
b"cpu_load_short,host=server01,region=us-west "
303+
b"value=0.64 1257894000\n",
304+
m.last_request.body,
305+
)
306+
289307
def test_write_points_bad_precision(self):
290308
cli = InfluxDBClient()
291309
with self.assertRaisesRegexp(

Diff for: influxdb/tests/dataframe_client_test.py

+25-23
Original file line numberDiff line numberDiff line change
@@ -119,38 +119,40 @@ def test_write_points_from_dataframe_with_time_precision(self):
119119
"http://localhost:8086/write",
120120
status_code=204)
121121

122-
points = {
123-
'database': 'db',
124-
'points': [
125-
{'time': '1970-01-01T00:00:00+00:00',
126-
'fields': {
127-
'column_one': '1',
128-
'column_three': 1.0,
129-
'column_two': 1},
130-
'tags': {},
131-
'measurement': 'foo'},
132-
{'time': '1970-01-01T01:00:00+00:00',
133-
'fields': {
134-
'column_one': '2',
135-
'column_three': 2.0,
136-
'column_two': 2},
137-
'tags': {},
138-
'measurement': 'foo'}]
139-
}
140-
141122
cli = DataFrameClient(database='db')
142123
measurement = "foo"
143124

144125
cli.write_points(dataframe, measurement, time_precision='s')
145126
self.assertEqual(m.last_request.qs['precision'], ['s'])
127+
self.assertEqual(
128+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
129+
b'column_one="2",column_three=2.0,column_two=2 3600\n',
130+
m.last_request.body,
131+
)
146132

147-
cli.write_points(dataframe, measurement, time_precision='m')
148-
points.update(precision='m')
149-
self.assertEqual(m.last_request.qs['precision'], ['m'])
133+
cli.write_points(dataframe, measurement, time_precision='ms')
134+
self.assertEqual(m.last_request.qs['precision'], ['ms'])
135+
self.assertEqual(
136+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
137+
b'column_one="2",column_three=2.0,column_two=2 3600000\n',
138+
m.last_request.body,
139+
)
150140

151141
cli.write_points(dataframe, measurement, time_precision='u')
152-
points.update(precision='u')
153142
self.assertEqual(m.last_request.qs['precision'], ['u'])
143+
self.assertEqual(
144+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
145+
b'column_one="2",column_three=2.0,column_two=2 3600000000\n',
146+
m.last_request.body,
147+
)
148+
149+
cli.write_points(dataframe, measurement, time_precision='n')
150+
self.assertEqual(m.last_request.qs['precision'], ['n'])
151+
self.assertEqual(
152+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
153+
b'column_one="2",column_three=2.0,column_two=2 3600000000000\n',
154+
m.last_request.body,
155+
)
154156

155157
@raises(TypeError)
156158
def test_write_points_from_dataframe_fails_without_time_index(self):

0 commit comments

Comments
 (0)