diff --git a/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial.py b/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial.py new file mode 100644 index 00000000000..b970e3de6ae --- /dev/null +++ b/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial.py @@ -0,0 +1,61 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START composer_dataproc_workflow_instantiate_operator_tutorial] + +"""Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a +Spark Pi Job. + +This DAG relies on an Airflow variable +https://airflow.apache.org/concepts.html#variables +* project_id - Google Cloud Project ID to use for the Cloud Dataproc Template. +""" + +import datetime + +from airflow import models +from airflow.contrib.operators import dataproc_operator +from airflow.utils.dates import days_ago + +project_id = models.Variable.get("project_id") + + +default_args = { + # Tell airflow to start one day ago, so that it runs as soon as you upload it + "start_date": days_ago(1), + "project_id": project_id, +} + +# Define a DAG (directed acyclic graph) of tasks. +# Any task you create within the context manager is automatically added to the +# DAG object. +with models.DAG( + # The id you will see in the DAG airflow page + "dataproc_workflow_dag", + default_args=default_args, + # The interval with which to schedule the DAG + schedule_interval=datetime.timedelta(days=1), # Override to match your needs +) as dag: + + start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator( + # The task id of your job + task_id="dataproc_workflow_dag", + # The template id of your workflow + template_id="sparkpi", + project_id=project_id, + # The region for the template + region="us-central1", + ) + +# [END composer_dataproc_workflow_instantiate_operator_tutorial] diff --git a/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial_test.py b/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial_test.py new file mode 100644 index 00000000000..3cf8acf7f9b --- /dev/null +++ b/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial_test.py @@ -0,0 +1,30 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow import models + +from . import unit_testing + + +def test_dag_import(): + """Test that the DAG file can be successfully imported. + + This tests that the DAG can be parsed, but does not run it in an Airflow + environment. This is a recommended sanity check by the official Airflow + docs: https://airflow.incubator.apache.org/tutorial.html#testing + """ + models.Variable.set("project_id", "example-project") + from . import dataproc_workflow_template_instantiate_operator_tutorial as module + + unit_testing.assert_has_valid_dag(module)