Skip to content

Commit e571c48

Browse files
committed
update readme
1 parent a608bcd commit e571c48

File tree

5 files changed

+217
-170
lines changed

5 files changed

+217
-170
lines changed

README.md

+57
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,68 @@ print(scores)
120120

121121
```
122122

123+
123124
## Example Library
124125

125126
* [Minimal example](https://github.com/d6t/d6tflow/blob/master/docs/example-minimal.py)
126127
* [Rapid Prototyping for Quantitative Investing with d6tflow](https://github.com/d6tdev/d6tflow-binder-interactive/blob/master/example-trading.ipynb)
127128
* Chain together functions into a workflow and get the power of d6tflow with only little change in code. **[Jupyter notebook example](https://github.com/d6t/d6tflow/blob/master/docs/example-functional.ipynb)**
129+
Alternatively, chain together functions into a workflow and get the power of d6tflow with only little change in code. **[Jupyter notebook example](https://github.com/d6t/d6tflow/blob/master/docs/example-functional.ipynb)**
130+
131+
## Example: Functional Workflow
132+
``` python
133+
import pandas as pd
134+
135+
import d6tflow
136+
from d6tflow.functional import Workflow
137+
138+
flow = Workflow()
139+
140+
141+
@flow.task(d6tflow.tasks.TaskPqPandas)
142+
def Task1(task):
143+
df = pd.DataFrame({'a': range(3)})
144+
task.save(df)
145+
146+
147+
@flow.task(d6tflow.tasks.TaskPqPandas)
148+
def Task2(task):
149+
df = pd.DataFrame({'b': range(3)})
150+
task.save(df)
151+
152+
153+
@flow.task(d6tflow.tasks.TaskPqPandas)
154+
@flow.params(multiplier=d6tflow.IntParameter(default=2))
155+
@flow.requires({'input1': Task1, 'input2': Task2})
156+
def Task3(task):
157+
df1 = task.input()['input1'].load() # quickly load input data
158+
df2 = task.input()['input2'].load() # quickly load input data
159+
df = df1.join(df2, lsuffix='1', rsuffix='2')
160+
df['c'] = df['a'] * task.multiplier # use task parameter
161+
task.save(df)
162+
163+
164+
flow.run(Task3)
165+
flow.outputLoad(Task3)
166+
'''
167+
a b c
168+
0 0 0 0
169+
1 1 1 2
170+
2 2 2 4
171+
'''
172+
173+
# You can rerun the flow with different parameters
174+
flow.run(Task3, params={'multiplier': 3})
175+
flow.outputLoad(Task3)
176+
177+
'''
178+
a b c
179+
0 0 0 0
180+
1 1 1 3
181+
2 2 2 6
182+
'''
183+
```
184+
128185

129186
## Documentation
130187

d6tflow/functional.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import d6tflow
33
from functools import wraps
44
import pathlib
5+
import d6tcollect
56

67

78
class Workflow:
@@ -39,6 +40,7 @@ def wrapper(*args, **kwargs):
3940
return wrapper
4041
self.common_decorator = common_decorator
4142

43+
@d6tcollect._collectClass
4244
def task(self, task_type: d6tflow.tasks.TaskData):
4345
"""
4446
Flow step decorator.
@@ -62,6 +64,7 @@ def task(self, task_type: d6tflow.tasks.TaskData):
6264
self.object_count += 1
6365
return self.common_decorator
6466

67+
@d6tcollect._collectClass
6568
def requires(self, *args, **kwargs):
6669
"""
6770
Flow requires decorator.
@@ -94,6 +97,7 @@ def requires(self, *args, **kwargs):
9497

9598
return self.common_decorator
9699

100+
@d6tcollect._collectClass
97101
def params(self, **params):
98102
"""
99103
Flow parameters decorator.
@@ -111,6 +115,7 @@ def params(self, **params):
111115
setattr(self.steps[self.current_step], param, params[param])
112116
return self.common_decorator
113117

118+
@d6tcollect._collectClass
114119
def persists(self, to_persist: list):
115120
"""
116121
Flow persists decorator.
@@ -138,6 +143,7 @@ def preview(self, func_to_preview, params: dict):
138143
else:
139144
d6tflow.preview(self.steps[name]())
140145

146+
@d6tcollect._collectClass
141147
def run(self, funcs_to_run, params: dict = None, multi_params: dict = None, *args, **kwargs):
142148
"""
143149
Runs flow steps locally. See luigi.build for additional details
@@ -294,7 +300,7 @@ def reset(self, func_to_reset, params=None, *args, **kwargs):
294300
]
295301

296302
def resetAll(self, *args, **kwargs):
297-
"""Resets all functions that are attached to the workflow object that have run atleast once."""
303+
"""Resets all functions that are attached to the workflow object that have run at least once."""
298304
for name in self.steps:
299305
self.reset(name, params=None, *args, **kwargs)
300306

0 commit comments

Comments
 (0)