Skip to content

Commit ba89f77

Browse files
sofislbradmiro
andauthored
feat: add Dataproc-Composer tutorial (#4485)
* feat: initial composer dataproc DAG for composer-dataproc tutorial * fix: run linting * fix: add code region tags * fix: linting * chore: add more descriptive workflow name * chore: more descriptive dag title Co-authored-by: Brad Miro <[email protected]>
1 parent 04891c7 commit ba89f77

2 files changed

+91
-0
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START composer_dataproc_workflow_instantiate_operator_tutorial]
16+
17+
"""Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
18+
Spark Pi Job.
19+
20+
This DAG relies on an Airflow variable
21+
https://airflow.apache.org/concepts.html#variables
22+
* project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
23+
"""
24+
25+
import datetime
26+
27+
from airflow import models
28+
from airflow.contrib.operators import dataproc_operator
29+
from airflow.utils.dates import days_ago
30+
31+
project_id = models.Variable.get("project_id")
32+
33+
34+
default_args = {
35+
# Tell airflow to start one day ago, so that it runs as soon as you upload it
36+
"start_date": days_ago(1),
37+
"project_id": project_id,
38+
}
39+
40+
# Define a DAG (directed acyclic graph) of tasks.
41+
# Any task you create within the context manager is automatically added to the
42+
# DAG object.
43+
with models.DAG(
44+
# The id you will see in the DAG airflow page
45+
"dataproc_workflow_dag",
46+
default_args=default_args,
47+
# The interval with which to schedule the DAG
48+
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
49+
) as dag:
50+
51+
start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
52+
# The task id of your job
53+
task_id="dataproc_workflow_dag",
54+
# The template id of your workflow
55+
template_id="sparkpi",
56+
project_id=project_id,
57+
# The region for the template
58+
region="us-central1",
59+
)
60+
61+
# [END composer_dataproc_workflow_instantiate_operator_tutorial]
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from airflow import models
16+
17+
from . import unit_testing
18+
19+
20+
def test_dag_import():
21+
"""Test that the DAG file can be successfully imported.
22+
23+
This tests that the DAG can be parsed, but does not run it in an Airflow
24+
environment. This is a recommended sanity check by the official Airflow
25+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
26+
"""
27+
models.Variable.set("project_id", "example-project")
28+
from . import dataproc_workflow_template_instantiate_operator_tutorial as module
29+
30+
unit_testing.assert_has_valid_dag(module)

0 commit comments

Comments
 (0)