Skip to content

Commit 69987e3

Browse files
authored
docs: Dataproc ebay walkthrough update (#405)
* Update python-api-walkthrough.md Dataproc python walkthrough update. * Update submit_job_to_cluster.py Dataproc python walkthrough update * Update submit_job_to_cluster.py docs: fix lint errors.
1 parent fc20d4c commit 69987e3

File tree

2 files changed

+58
-8
lines changed

2 files changed

+58
-8
lines changed

dataproc/snippets/python-api-walkthrough.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ an explanation of how the code works.
107107

108108
python submit_job_to_cluster.py \
109109
--project_id={{project-id}} \
110-
--cluster_name=$CLUSTER \
111110
--region=$REGION \
111+
--cluster_name=$CLUSTER \
112112
--gcs_bucket=$BUCKET
113113

114114
## Job Output

dataproc/snippets/submit_job_to_cluster.py

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,59 @@
2626
"""
2727

2828
import argparse
29+
import os
2930
import re
3031

3132
from google.cloud import dataproc_v1
3233
from google.cloud import storage
3334

35+
DEFAULT_FILENAME = "pyspark_sort.py"
36+
waiting_callback = False
37+
38+
39+
def get_pyspark_file(pyspark_file=None):
40+
if pyspark_file:
41+
f = open(pyspark_file, "rb")
42+
return f, os.path.basename(pyspark_file)
43+
else:
44+
"""Gets the PySpark file from current directory."""
45+
current_dir = os.path.dirname(os.path.abspath(__file__))
46+
f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb")
47+
return f, DEFAULT_FILENAME
48+
49+
50+
def get_region_from_zone(zone):
51+
try:
52+
region_as_list = zone.split("-")[:-1]
53+
return "-".join(region_as_list)
54+
except (AttributeError, IndexError, ValueError):
55+
raise ValueError("Invalid zone provided, please check your input.")
56+
57+
58+
def upload_pyspark_file(project, bucket_name, filename, spark_file):
59+
"""Uploads the PySpark file in this directory to the configured input
60+
bucket."""
61+
print("Uploading pyspark file to Cloud Storage.")
62+
client = storage.Client(project=project)
63+
bucket = client.get_bucket(bucket_name)
64+
blob = bucket.blob(filename)
65+
blob.upload_from_file(spark_file)
66+
67+
68+
def download_output(project, cluster_id, output_bucket, job_id):
69+
"""Downloads the output file from Cloud Storage and returns it as a
70+
string."""
71+
print("Downloading output file.")
72+
client = storage.Client(project=project)
73+
bucket = client.get_bucket(output_bucket)
74+
output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
75+
cluster_id, job_id
76+
)
77+
return bucket.blob(output_blob).download_as_string()
78+
3479

3580
# [START dataproc_create_cluster]
36-
def quickstart(project_id, region, cluster_name, job_file_path):
81+
def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
3782
# Create the cluster client.
3883
cluster_client = dataproc_v1.ClusterControllerClient(
3984
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
@@ -59,6 +104,9 @@ def quickstart(project_id, region, cluster_name, job_file_path):
59104

60105
# [END dataproc_create_cluster]
61106

107+
spark_file, spark_filename = get_pyspark_file(pyspark_file)
108+
upload_pyspark_file(project_id, gcs_bucket, spark_filename, spark_file)
109+
62110
# [START dataproc_submit_job]
63111
# Create the job client.
64112
job_client = dataproc_v1.JobControllerClient(
@@ -68,7 +116,7 @@ def quickstart(project_id, region, cluster_name, job_file_path):
68116
# Create the job config.
69117
job = {
70118
"placement": {"cluster_name": cluster_name},
71-
"pyspark_job": {"main_python_file_uri": job_file_path},
119+
"pyspark_job": {"main_python_file_uri": "gs://{}/{}".format(gcs_bucket, spark_filename)},
72120
}
73121

74122
operation = job_client.submit_job_as_operation(
@@ -128,13 +176,15 @@ def quickstart(project_id, region, cluster_name, job_file_path):
128176
required=True,
129177
help="Name to use for creating a cluster.",
130178
)
179+
131180
parser.add_argument(
132-
"--job_file_path",
133-
type=str,
134-
required=True,
135-
help="Job in Cloud Storage to run on the cluster.",
181+
"--gcs_bucket", help="Bucket to upload Pyspark file to", required=True
182+
)
183+
184+
parser.add_argument(
185+
"--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py"
136186
)
137187

138188
args = parser.parse_args()
139-
quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path)
189+
quickstart(args.project_id, args.region, args.cluster_name, args.gcs_bucket, args.pyspark_file)
140190
# [END dataproc_quickstart]

0 commit comments

Comments
 (0)