Skip to content

Commit fade9e0

Browse files
authored
fix: creating Tasks with import statements (#491)
1 parent aa0f9a5 commit fade9e0

File tree

3 files changed

+28
-11
lines changed

3 files changed

+28
-11
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
### Bug Fixes
66
1. [#483](https://github.com/influxdata/influxdb-client-python/pull/483): Querying data if the `debug` is enabled
77
1. [#477](https://github.com/influxdata/influxdb-client-python/pull/477): Parsing date fails due to thread race
8-
1. [#486](https://github.com/influxdata/influxdb-client-python/pull/486): Fix bug when serializing DataFrames that might occur if you're inserting NaN values and have columns starting with digits.
8+
1. [#486](https://github.com/influxdata/influxdb-client-python/pull/486): Serializing DataFrames with columns starting with digits
9+
1. [#491](https://github.com/influxdata/influxdb-client-python/pull/491): Creating `Tasks` with `import` statements
910

1011
### Dependencies
1112
1. [#472](https://github.com/influxdata/influxdb-client-python/pull/472): Update `RxPY` to `4.0.4`

influxdb_client/client/tasks_api.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
Use tasks (scheduled Flux queries) to input a data stream and then analyze, modify, and act on the data accordingly.
55
"""
66

7-
87
import datetime
98
from typing import List
109

@@ -65,7 +64,7 @@ def _create_task(name: str, flux: str, every, cron, org_id: str) -> Task:
6564
repetition += "cron: "
6665
repetition += '"' + cron + '"'
6766

68-
flux_with_options = 'option task = {{name: "{}", {}}} \n {}'.format(name, repetition, flux)
67+
flux_with_options = '{} \n\noption task = {{name: "{}", {}}}'.format(flux, name, repetition)
6968
task.flux = flux_with_options
7069

7170
return task
@@ -151,10 +150,10 @@ def get_runs(self, task_id, **kwargs) -> List['Run']:
151150
Retrieve list of run records for a task.
152151
153152
:param task_id: task id
154-
:param str after: returns runs after specified ID
155-
:param int limit: the number of runs to return
156-
:param datetime after_time: filter runs to those scheduled after this time, RFC3339
157-
:param datetime before_time: filter runs to those scheduled before this time, RFC3339
153+
:key str after: returns runs after specified ID
154+
:key int limit: the number of runs to return
155+
:key datetime after_time: filter runs to those scheduled after this time, RFC3339
156+
:key datetime before_time: filter runs to those scheduled before this time, RFC3339
158157
"""
159158
return self._service.get_tasks_id_runs(task_id=task_id, **kwargs).runs
160159

tests/test_TasksApi.py

+21-4
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def test_create_task_every(self):
121121
self.assertEqual(task.status, "active")
122122
self.assertEqual(task.every, "1h")
123123
self.assertEqual(task.cron, None)
124-
self.assertTrue(task.flux.endswith(TASK_FLUX))
124+
self.assertTrue(task.flux.startswith(TASK_FLUX))
125125

126126
def test_create_task_cron(self):
127127
task_name = self.generate_name("it task")
@@ -137,7 +137,7 @@ def test_create_task_cron(self):
137137
self.assertEqual(task.cron, "0 2 * * *")
138138
# self.assertEqualIgnoringWhitespace(task.flux, flux)
139139

140-
self.assertTrue(task.flux.endswith(TASK_FLUX))
140+
self.assertTrue(task.flux.startswith(TASK_FLUX))
141141
# self.assertEqual(task.links, "active")
142142

143143
links = task.links
@@ -151,6 +151,23 @@ def test_create_task_cron(self):
151151
# TODO missing get labels
152152
self.assertEqual(links.labels, "/api/v2/tasks/" + task.id + "/labels")
153153

154+
def test_create_with_import(self):
155+
task_name = self.generate_name("it task")
156+
task_flux = 'import "http"\n\n' \
157+
'from(bucket: "iot_center")\n' \
158+
' |> range(start: -30d)\n' \
159+
' |> filter(fn: (r) => r._measurement == "environment")\n' \
160+
' |> aggregateWindow(every: 1h, fn: mean)'
161+
task = self.tasks_api.create_task_cron(task_name, task_flux, "10 0 * * * *", self.organization.id)
162+
163+
self.assertIsNotNone(task.id)
164+
self.assertEqual(task.name, task_name)
165+
self.assertEqual(task.org_id, self.organization.id)
166+
self.assertEqual(task.status, "active")
167+
self.assertEqual(task.cron, "10 0 * * * *")
168+
self.assertTrue(task.flux.startswith(task_flux))
169+
self.assertTrue(task.flux.splitlines()[-1].startswith('option task = '))
170+
154171
def test_find_task_by_id(self):
155172
task_name = self.generate_name("it task")
156173
task = self.tasks_api.create_task_cron(task_name, TASK_FLUX, "0 2 * * *", self.organization.id)
@@ -182,12 +199,12 @@ def test_update_task(self):
182199
cron_task = self.tasks_api.create_task_cron(task_name, TASK_FLUX, "0 2 * * *", self.organization.id)
183200

184201
flux = '''
202+
{flux}
203+
185204
option task = {{
186205
name: "{task_name}",
187206
every: 3m
188207
}}
189-
190-
{flux}
191208
'''.format(task_name=task_name, flux=TASK_FLUX)
192209

193210
cron_task.cron = None

0 commit comments

Comments
 (0)