Skip to content

Commit f52fd90

Browse files
committed
docs
1 parent 010ff1c commit f52fd90

7 files changed

+412
-146
lines changed

README.md

+76-76
Original file line numberDiff line numberDiff line change
@@ -12,140 +12,140 @@ The workflow involves chaining together parameterized tasks which pass multiple
1212

1313
`d6tflow` to the rescue! **With d6tflow you can easily chain together complex data flows and execute them. You can quickly load input and output data for each task.** It makes your workflow very clear and intuitive.
1414

15-
#### When to use d6tflow?
16-
17-
* Data engineering: when you prepare and analyze data with pandas or dask. That is you load, filter, transform, join data
18-
* Data science: when you analyze data with ANY ML library including sklearn, pytorch, keras. That is you perform EDA, feature engineering, model training and evaluation
19-
2015
#### Read more at:
2116
[4 Reasons Why Your Machine Learning Code is Probably Bad](https://github.com/d6t/d6t-python/blob/master/blogs/reasons-why-bad-ml-code.rst)
2217
[How d6tflow is different from airflow/luigi](https://github.com/d6t/d6t-python/blob/master/blogs/datasci-dags-airflow-meetup.md)
2318

2419
![Badge](https://www.kdnuggets.com/images/tkb-1904-p.png "Badge")
2520
![Badge](https://www.kdnuggets.com/images/tkb-1902-g.png "Badge")
2621

22+
## When to use d6tflow?
23+
24+
* Data science: you want to build better models faster. Your workflow is EDA, feature engineering, model training and evaluation. d6tflow works with ANY ML library including sklearn, pytorch, keras
25+
* Data engineering: you want to build robust data pipelines using a lightweight yet powerful library. You workflow is load, filter, transform, join data in pandas, dask or pyspark.
26+
2727
## What can d6tflow do for you?
2828

29+
* Data science
30+
* Experiment management: easily manage workflows that compare different models to find the best one
31+
* Scalable workflows: build an efficient data workflow that support rapid prototyping and iterations
32+
* Cache data: easily save/load intermediary calculations to reduce model training time
33+
* Model deployment: d6tflow workflows are easier to deploy to production
2934
* Data engineering
3035
* Build a data workflow made up of tasks with dependencies and parameters
31-
* Check task dependencies and their execution status
36+
* Visualize task dependencies and their execution status
3237
* Execute tasks including dependencies
3338
* Intelligently continue workflows after failed tasks
3439
* Intelligently rerun workflow after changing parameters, code or data
35-
* Intelligently manage parameters between dependencies
36-
* Save task output to Parquet, CSV, JSON, pickle and in-memory
37-
* Load task output to pandas dataframe and python objects
3840
* Quickly share and hand off output data to others
39-
* Data science
40-
* Scalable workflows: build an efficient data workflow made up of tasks with dependencies and parameters
41-
* Experiment tracking: compare model performance with different preprocessing and model selection options
42-
* Model deployment: d6tflow workflows are easier to deploy to production
4341

4442

4543
## Installation
4644

47-
Install with `pip install d6tflow`. To update, run `pip install d6tflow -U --no-deps`.
45+
Install with `pip install d6tflow`. To update, run `pip install d6tflow -U`.
4846

49-
You can also clone the repo and run `pip install .`
47+
If you are behind an enterprise firewall, you can also clone/download the repo and run `pip install .`
5048

5149
**Python3 only** You might need to call `pip3 install d6tflow` if you have not set python 3 as default.
5250

5351
To install latest DEV `pip install git+git://github.com/d6t/d6tflow.git` or upgrade `pip install git+git://github.com/d6t/d6tflow.git -U --no-deps`
5452

55-
## Example 1: Introduction
53+
## Example: Model Comparison
54+
55+
Below is an introductory example that gets training data, trains two models and compares their performance.
5656

57-
This is a minial example. Be sure to check out the ML workflow example below.
57+
**[See the full ML workflow example here](http://tiny.cc/d6tflow-start-example)**
58+
**[Interactive mybinder jupyter notebook](http://tiny.cc/d6tflow-start-interactive)**
5859

5960
```python
6061

6162
import d6tflow
63+
import sklearn.datasets, sklearn.ensemble, sklearn.linear_model
6264
import pandas as pd
6365

64-
# define 2 tasks that load raw data
65-
class Task1(d6tflow.tasks.TaskPqPandas):
66-
66+
67+
# get training data and save it
68+
class GetData(d6tflow.tasks.TaskPqPandas):
69+
persist = ['x','y']
70+
6771
def run(self):
68-
df = pd.DataFrame({'a':range(3)})
69-
self.save(df) # quickly save dataframe
72+
ds = sklearn.datasets.load_boston()
73+
df_trainX = pd.DataFrame(ds.data, columns=ds.feature_names)
74+
df_trainY = pd.DataFrame(ds.target, columns=['target'])
75+
self.save({'x': df_trainX, 'y': df_trainY}) # persist/cache training data
7076

71-
class Task2(Task1):
72-
pass
7377

74-
# define another task that depends on data from task1 and task2
75-
@d6tflow.requires({'input1':Task1,'input2':Task2})
76-
class Task3(d6tflow.tasks.TaskPqPandas):
77-
multiplier = d6tflow.IntParameter(default=2)
78-
78+
# train models to compare
79+
@d6tflow.requires(GetData) # define dependency
80+
class ModelTrain(d6tflow.tasks.TaskPickle):
81+
model = d6tflow.Parameter() # parameter for model selection
82+
7983
def run(self):
80-
df1 = self.input()['input1'].load() # quickly load input data
81-
df2 = self.input()['input2'].load() # quickly load input data
82-
df = df1.join(df2, lsuffix='1', rsuffix='2')
83-
df['b']=df['a1']*self.multiplier # use task parameter
84-
self.save(df)
85-
86-
# Execute task including all its dependencies
87-
flow = d6tflow.Workflow(Task3)
88-
flow.run()
89-
'''
90-
* 3 ran successfully:
91-
- 1 Task1()
92-
- 1 Task2()
93-
- 1 Task3(multiplier=2)
94-
'''
84+
df_trainX, df_trainY = self.inputLoad() # quickly load input data
9585

96-
# quickly load output data. Task1().outputLoad() also works
97-
flow.outputLoad()
98-
'''
99-
a1 a2 b
100-
0 0 0 0
101-
1 1 1 2
102-
2 2 2 4
103-
'''
86+
if self.model=='ols': # select model based on parameter
87+
model = sklearn.linear_model.LinearRegression()
88+
elif self.model=='gbm':
89+
model = sklearn.ensemble.GradientBoostingRegressor()
10490

105-
# Intelligently rerun workflow after changing parameters
106-
flow2 = d6tflow.Workflow(Task3, {'multiplier':3})
107-
flow2.preview()
108-
'''
109-
└─--[Task3-{'multiplier': '3'} (PENDING)] => this changed and needs to run
110-
|--[Task1-{} (COMPLETE)] => this doesn't change and doesn't need to rerun
111-
└─--[Task2-{} (COMPLETE)] => this doesn't change and doesn't need to rerun
112-
'''
91+
# fit and save model with training score
92+
model.fit(df_trainX, df_trainY)
93+
self.save(model)
94+
self.saveMeta({'score': model.score(df_trainX, df_trainY)})
11395

114-
```
96+
# goal: compare performance of two models
97+
# define workflow manager
98+
flow = d6tflow.WorkflowMulti(ModelTrain, {'model1':{'model':'ols'}, 'model2':{'model':'gbm'}})
99+
flow.reset_upstream(confirm=False) # DEMO ONLY: force re-run
100+
flow.run() # execute model training including all dependencies
115101

116-
## Example 2: Comparing model performance in ML Workflow
102+
'''
117103
118-
Below is sample output for a machine learning workflow. The goal is to efficiently compare the performance of two ML models.
104+
Scheduled 2 tasks of which:
105+
* 2 ran successfully:
106+
- 1 GetData()
107+
- 1 ModelTrain(model=ols)
108+
109+
# To run 2nd model, don't need to re-run all tasks, only the ones that changed
110+
Scheduled 2 tasks of which:
111+
* 1 complete ones were encountered:
112+
- 1 GetData()
113+
* 1 ran successfully:
114+
- 1 ModelTrain(model=gbm)
115+
'''
119116

120-
**[See the full example here](http://tiny.cc/d6tflow-start-example)**
121-
**[Interactive mybinder jupyter notebook example](http://tiny.cc/d6tflow-start-interactive)**
117+
scores = flow.outputLoadMeta() # load model scores
118+
print(scores)
119+
# {'model1': {'score': 0.7406426641094095}, 'gbm': {'model2': 0.9761405838418584}}
122120

123-
## Example 3: Turn functions into workflows
121+
```
122+
123+
## Example Library
124124

125-
Alternatively, chain together functions into a workflow and get the power of d6tflow with only little change in code. **[Jupyter notebook example](https://github.com/d6t/d6tflow/blob/master/docs/example-functional.ipynb)**
125+
* [Minimal example](https://github.com/d6t/d6tflow/blob/master/docs/example-minimal.py)
126+
* [Rapid Prototyping for Quantitative Investing with d6tflow](https://github.com/d6tdev/d6tflow-binder-interactive/blob/master/example-trading.ipynb)
127+
* Chain together functions into a workflow and get the power of d6tflow with only little change in code. **[Jupyter notebook example](https://github.com/d6t/d6tflow/blob/master/docs/example-functional.ipynb)**
126128

127129
## Documentation
128130

129131
Library usage and reference https://d6tflow.readthedocs.io
130132

131-
Real-life project template https://github.com/d6t/d6tflow-template
133+
## Getting started resources
132134

133135
Transition to d6tflow from typical scripts [5 Step Guide to Scalable Deep Learning Pipelines with d6tflow](https://htmlpreview.github.io/?https://github.com/d6t/d6t-python/blob/master/blogs/blog-20190813-d6tflow-pytorch.html)
134136

135-
136-
## d6tpipe Integration
137-
138-
To quickly share workflow outputs, we recommend you make use of [d6tpipe](https://github.com/d6t/d6tpipe). See [Sharing Workflows and Outputs](https://d6tflow.readthedocs.io/en/latest/collaborate.html).
137+
Real-life project template https://github.com/d6t/d6tflow-template
139138

140139
## Pro version
141140

142141
Additional features:
142+
* Team sharing of workflows and data
143143
* Integrations for enterprise and cloud storage (SQL, S3)
144-
* Integrations for distributed copmute (dask, pyspark)
145-
* Automatically detect data changes
146-
* Advanced machine learning features
144+
* Integrations for distributed compute (dask, pyspark)
145+
* Integrations for cloud execution
146+
* Workflow deployment and scheduling
147147

148-
[Request demo](https://pipe.databolt.tech/gui/request-premium/)
148+
[Schedule demo](https://calendly.com/databolt/30min)
149149

150150
## Accelerate Data Science
151151

docs/example-minimal.py

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import d6tflow
2+
import pandas as pd
3+
4+
5+
# define 2 tasks that load raw data
6+
class Task1(d6tflow.tasks.TaskCache):
7+
8+
def run(self):
9+
df = pd.DataFrame({'a': range(3)})
10+
self.save(df) # quickly save dataframe
11+
12+
13+
class Task2(Task1):
14+
pass
15+
16+
17+
# define another task that depends on data from task1 and task2
18+
@d6tflow.requires({'input1': Task1, 'input2': Task2})
19+
class Task3(d6tflow.tasks.TaskCache):
20+
multiplier = d6tflow.IntParameter(default=2)
21+
22+
def run(self):
23+
df1 = self.input()['input1'].load() # quickly load input data
24+
df2 = self.input()['input2'].load() # quickly load input data
25+
df = df1.join(df2, lsuffix='1', rsuffix='2')
26+
df['b'] = df['a1'] * self.multiplier # use task parameter
27+
self.save(df)
28+
29+
30+
# Execute task including all its dependencies
31+
flow = d6tflow.Workflow(Task3)
32+
flow.run()
33+
'''
34+
* 3 ran successfully:
35+
- 1 Task1()
36+
- 1 Task2()
37+
- 1 Task3(multiplier=2)
38+
'''
39+
40+
# quickly load output data. Task1().outputLoad() also works
41+
flow.outputLoad()
42+
'''
43+
a1 a2 b
44+
0 0 0 0
45+
1 1 1 2
46+
2 2 2 4
47+
'''
48+
49+
# Intelligently rerun workflow after changing parameters
50+
flow2 = d6tflow.Workflow(Task3, {'multiplier': 3})
51+
flow2.preview()
52+
'''
53+
└─--[Task3-{'multiplier': '3'} (PENDING)] => this changed and needs to run
54+
|--[Task1-{} (COMPLETE)] => this doesn't change and doesn't need to rerun
55+
└─--[Task2-{} (COMPLETE)] => this doesn't change and doesn't need to rerun
56+
'''

docs/example-ml-compare-short.py

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import d6tflow
2+
import sklearn.datasets, sklearn.ensemble, sklearn.linear_model
3+
import pandas as pd
4+
5+
6+
# get training data and save it
7+
class GetData(d6tflow.tasks.TaskPqPandas):
8+
persist = ['x','y']
9+
10+
def run(self):
11+
ds = sklearn.datasets.load_boston()
12+
df_trainX = pd.DataFrame(ds.data, columns=ds.feature_names)
13+
df_trainY = pd.DataFrame(ds.target, columns=['target'])
14+
self.save({'x': df_trainX, 'y': df_trainY}) # persist/cache training data
15+
16+
17+
# train different models to compare
18+
@d6tflow.requires(GetData) # define dependency
19+
class ModelTrain(d6tflow.tasks.TaskPickle):
20+
model = d6tflow.Parameter() # parameter for model selection
21+
22+
def run(self):
23+
df_trainX, df_trainY = self.inputLoad() # quickly load input data
24+
25+
if self.model=='ols': # select model based on parameter
26+
model = sklearn.linear_model.LinearRegression()
27+
elif self.model=='gbm':
28+
model = sklearn.ensemble.GradientBoostingRegressor()
29+
30+
# fit and save model with training score
31+
model.fit(df_trainX, df_trainY)
32+
self.save(model)
33+
self.saveMeta({'score': model.score(df_trainX, df_trainY)})
34+
35+
# goal: compare performance of two models
36+
# define workflow manager
37+
flow = d6tflow.WorkflowMulti(ModelTrain, {'model1':{'model':'ols'}, 'model2':{'model':'gbm'}})
38+
flow.reset_upstream(confirm=False) # DEMO ONLY: force re-run
39+
flow.run() # execute model training including all dependencies
40+
41+
'''
42+
===== Execution Summary =====
43+
Scheduled 2 tasks of which:
44+
* 2 ran successfully:
45+
- 1 GetData()
46+
- 1 ModelTrain(model=ols)
47+
This progress looks :) because there were no failed tasks or missing dependencies
48+
'''
49+
50+
scores = flow.outputLoadMeta() # load model scores
51+
print(scores)
52+
# {'model1': {'score': 0.7406426641094095}, 'gbm': {'model2': 0.9761405838418584}}

0 commit comments

Comments
 (0)