Skip to content
This repository was archived by the owner on Nov 29, 2023. It is now read-only.

Commit 07c9dfb

Browse files
bradmiroleahecole
andauthored
feat: dataproc quickstart sample added and create_cluster updated [(#2629)](GoogleCloudPlatform/python-docs-samples#2629)
* Adding quickstart sample * Added new quickstart sample and updated create_cluster sample * Fix to create_cluster.py * deleted dataproc quickstart files not under dataproc/quickstart/ * Added quickstart test * Linting and formatting fixes * Revert "Linting and formatting fixes" This reverts commit c5afcbcdf9deccbb7a21ddd82ae0fc305e79c008. * Added bucket cleanup to quickstart test * Changes to samples and tests * Linting fixes * Removed todos in favor of clearer docstring * Fixed lint error Co-authored-by: Leah E. Cole <[email protected]>
1 parent 9edbb0e commit 07c9dfb

File tree

4 files changed

+223
-15
lines changed

4 files changed

+223
-15
lines changed

samples/snippets/create_cluster.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,29 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17+
# This sample walks a user through creating a Cloud Dataproc cluster using
18+
# the Python client library.
19+
20+
# [START dataproc_create_cluster]
21+
from google.cloud import dataproc_v1 as dataproc
22+
1723

1824
def create_cluster(project_id, region, cluster_name):
19-
# [START dataproc_create_cluster]
20-
from google.cloud import dataproc_v1 as dataproc
25+
"""This sample walks a user through creating a Cloud Dataproc cluster
26+
using the Python client library.
2127
22-
# TODO(developer): Uncomment and set the following variables
23-
# project_id = 'YOUR_PROJECT_ID'
24-
# region = 'YOUR_CLUSTER_REGION'
25-
# cluster_name = 'YOUR_CLUSTER_NAME'
28+
Args:
29+
project_id (string): Project to use for creating resources.
30+
region (string): Region where the resources should live.
31+
cluster_name (string): Name to use for creating a cluster.
32+
"""
2633

27-
# Create a client with the endpoint set to the desired cluster region
28-
client = dataproc.ClusterControllerClient(client_options={
34+
# Create a client with the endpoint set to the desired cluster region.
35+
cluster_client = dataproc.ClusterControllerClient(client_options={
2936
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
3037
})
3138

32-
# Create the cluster config
39+
# Create the cluster config.
3340
cluster = {
3441
'project_id': project_id,
3542
'cluster_name': cluster_name,
@@ -45,10 +52,10 @@ def create_cluster(project_id, region, cluster_name):
4552
}
4653
}
4754

48-
# Create the cluster
49-
operation = client.create_cluster(project_id, region, cluster)
55+
# Create the cluster.
56+
operation = cluster_client.create_cluster(project_id, region, cluster)
5057
result = operation.result()
5158

52-
# Output a success message
59+
# Output a success message.
5360
print('Cluster created successfully: {}'.format(result.cluster_name))
5461
# [END dataproc_create_cluster]

samples/snippets/create_cluster_test.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,23 @@
2020

2121
import create_cluster
2222

23+
2324
PROJECT_ID = os.environ['GCLOUD_PROJECT']
2425
REGION = 'us-central1'
25-
CLUSTER_NAME = 'test-cluster-{}'.format(str(uuid.uuid4()))
26+
CLUSTER_NAME = 'py-cc-test-{}'.format(str(uuid.uuid4()))
2627

2728

2829
@pytest.fixture(autouse=True)
2930
def teardown():
3031
yield
3132

32-
client = dataproc.ClusterControllerClient(client_options={
33+
cluster_client = dataproc.ClusterControllerClient(client_options={
3334
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
3435
})
3536
# Client library function
36-
client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
37+
operation = cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
38+
# Wait for cluster to delete
39+
operation.result()
3740

3841

3942
def test_cluster_create(capsys):
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# [START dataproc_quickstart]
18+
import time
19+
20+
from google.cloud import dataproc_v1 as dataproc
21+
from google.cloud import storage
22+
23+
24+
def quickstart(project_id, region, cluster_name, job_file_path):
25+
"""This quickstart sample walks a user through creating a Cloud Dataproc
26+
cluster, submitting a PySpark job from Google Cloud Storage to the
27+
cluster, reading the output of the job and deleting the cluster, all
28+
using the Python client library.
29+
30+
Args:
31+
project_id (string): Project to use for creating resources.
32+
region (string): Region where the resources should live.
33+
cluster_name (string): Name to use for creating a cluster.
34+
job_file_path (string): Job in GCS to execute against the cluster.
35+
"""
36+
37+
# Create the cluster client.
38+
cluster_client = dataproc.ClusterControllerClient(client_options={
39+
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
40+
})
41+
42+
# Create the cluster config.
43+
cluster = {
44+
'project_id': project_id,
45+
'cluster_name': cluster_name,
46+
'config': {
47+
'master_config': {
48+
'num_instances': 1,
49+
'machine_type_uri': 'n1-standard-1'
50+
},
51+
'worker_config': {
52+
'num_instances': 2,
53+
'machine_type_uri': 'n1-standard-1'
54+
}
55+
}
56+
}
57+
58+
# Create the cluster.
59+
operation = cluster_client.create_cluster(project_id, region, cluster)
60+
result = operation.result()
61+
62+
print('Cluster created successfully: {}'.format(result.cluster_name))
63+
64+
# Create the job client.
65+
job_client = dataproc.JobControllerClient(client_options={
66+
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
67+
})
68+
69+
# Create the job config.
70+
job = {
71+
'placement': {
72+
'cluster_name': cluster_name
73+
},
74+
'pyspark_job': {
75+
'main_python_file_uri': job_file_path
76+
}
77+
}
78+
79+
job_response = job_client.submit_job(project_id, region, job)
80+
job_id = job_response.reference.job_id
81+
82+
print('Submitted job \"{}\".'.format(job_id))
83+
84+
# Termimal states for a job.
85+
terminal_states = {
86+
dataproc.types.JobStatus.ERROR,
87+
dataproc.types.JobStatus.CANCELLED,
88+
dataproc.types.JobStatus.DONE
89+
}
90+
91+
# Create a timeout such that the job gets cancelled if not in a
92+
# terminal state after a fixed period of time.
93+
timeout_seconds = 600
94+
time_start = time.time()
95+
96+
# Wait for the job to complete.
97+
while job_response.status.state not in terminal_states:
98+
if time.time() > time_start + timeout_seconds:
99+
job_client.cancel_job(project_id, region, job_id)
100+
print('Job {} timed out after threshold of {} seconds.'.format(
101+
job_id, timeout_seconds))
102+
103+
# Poll for job termination once a second.
104+
time.sleep(1)
105+
job_response = job_client.get_job(project_id, region, job_id)
106+
107+
# Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
108+
cluster_info = cluster_client.get_cluster(
109+
project_id, region, cluster_name)
110+
111+
storage_client = storage.Client()
112+
bucket = storage_client.get_bucket(cluster_info.config.config_bucket)
113+
output_blob = (
114+
'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'
115+
.format(cluster_info.cluster_uuid, job_id))
116+
output = bucket.blob(output_blob).download_as_string()
117+
118+
print('Job {} finished with state {}:\n{}'.format(
119+
job_id,
120+
job_response.status.State.Name(job_response.status.state),
121+
output))
122+
123+
# Delete the cluster once the job has terminated.
124+
operation = cluster_client.delete_cluster(project_id, region, cluster_name)
125+
operation.result()
126+
127+
print('Cluster {} successfully deleted.'.format(cluster_name))
128+
# [END dataproc_quickstart]
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
import uuid
17+
import pytest
18+
19+
from google.cloud import dataproc_v1 as dataproc
20+
from google.cloud import storage
21+
22+
import quickstart
23+
24+
25+
PROJECT_ID = os.environ['GCLOUD_PROJECT']
26+
REGION = 'us-central1'
27+
CLUSTER_NAME = 'py-qs-test-{}'.format(str(uuid.uuid4()))
28+
STAGING_BUCKET = 'py-dataproc-qs-bucket-{}'.format(str(uuid.uuid4()))
29+
JOB_FILE_NAME = 'sum.py'
30+
JOB_FILE_PATH = 'gs://{}/{}'.format(STAGING_BUCKET, JOB_FILE_NAME)
31+
SORT_CODE = (
32+
"import pyspark\n"
33+
"sc = pyspark.SparkContext()\n"
34+
"rdd = sc.parallelize((1,2,3,4,5))\n"
35+
"sum = rdd.reduce(lambda x, y: x + y)\n"
36+
)
37+
38+
39+
@pytest.fixture(autouse=True)
40+
def setup_teardown():
41+
storage_client = storage.Client()
42+
bucket = storage_client.create_bucket(STAGING_BUCKET)
43+
blob = bucket.blob(JOB_FILE_NAME)
44+
blob.upload_from_string(SORT_CODE)
45+
46+
yield
47+
48+
cluster_client = dataproc.ClusterControllerClient(client_options={
49+
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
50+
})
51+
52+
# The quickstart sample deletes the cluster, but if the test fails
53+
# before cluster deletion occurs, it can be manually deleted here.
54+
clusters = cluster_client.list_clusters(PROJECT_ID, REGION)
55+
56+
for cluster in clusters:
57+
if cluster.cluster_name == CLUSTER_NAME:
58+
cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
59+
60+
blob.delete()
61+
62+
63+
def test_quickstart(capsys):
64+
quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH)
65+
66+
out, _ = capsys.readouterr()
67+
assert 'Cluster created successfully' in out
68+
assert 'Submitted job' in out
69+
assert 'finished with state DONE:' in out
70+
assert 'successfully deleted' in out

0 commit comments

Comments
 (0)