@@ -64,7 +64,7 @@ def download_output(project_id, cluster_id, output_bucket, job_id):
64
64
return bucket .blob (output_blob ).download_as_string ()
65
65
66
66
67
- # [START create_cluster ]
67
+ # [START dataproc_create_cluster ]
68
68
def create_cluster (dataproc , project , zone , region , cluster_name ):
69
69
print ('Creating cluster...' )
70
70
zone_uri = \
@@ -92,7 +92,7 @@ def create_cluster(dataproc, project, zone, region, cluster_name):
92
92
region = region ,
93
93
body = cluster_data ).execute ()
94
94
return result
95
- # [END create_cluster ]
95
+ # [END dataproc_create_cluster ]
96
96
97
97
98
98
def wait_for_cluster_creation (dataproc , project_id , region , cluster_name ):
@@ -113,7 +113,7 @@ def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
113
113
break
114
114
115
115
116
- # [START list_clusters_with_detail ]
116
+ # [START dataproc_list_clusters_with_detail ]
117
117
def list_clusters_with_details (dataproc , project , region ):
118
118
result = dataproc .projects ().regions ().clusters ().list (
119
119
projectId = project ,
@@ -123,7 +123,7 @@ def list_clusters_with_details(dataproc, project, region):
123
123
print ("{} - {}"
124
124
.format (cluster ['clusterName' ], cluster ['status' ]['state' ]))
125
125
return result
126
- # [END list_clusters_with_detail ]
126
+ # [END dataproc_list_clusters_with_detail ]
127
127
128
128
129
129
def get_cluster_id_by_name (cluster_list , cluster_name ):
@@ -133,7 +133,7 @@ def get_cluster_id_by_name(cluster_list, cluster_name):
133
133
return cluster ['clusterUuid' ], cluster ['config' ]['configBucket' ]
134
134
135
135
136
- # [START submit_pyspark_job ]
136
+ # [START dataproc_submit_pyspark_job ]
137
137
def submit_pyspark_job (dataproc , project , region ,
138
138
cluster_name , bucket_name , filename ):
139
139
"""Submits the Pyspark job to the cluster, assuming `filename` has
@@ -156,21 +156,21 @@ def submit_pyspark_job(dataproc, project, region,
156
156
job_id = result ['reference' ]['jobId' ]
157
157
print ('Submitted job ID {}' .format (job_id ))
158
158
return job_id
159
- # [END submit_pyspark_job ]
159
+ # [END dataproc_submit_pyspark_job ]
160
160
161
161
162
- # [START delete ]
162
+ # [START dataproc_delete ]
163
163
def delete_cluster (dataproc , project , region , cluster ):
164
164
print ('Tearing down cluster' )
165
165
result = dataproc .projects ().regions ().clusters ().delete (
166
166
projectId = project ,
167
167
region = region ,
168
168
clusterName = cluster ).execute ()
169
169
return result
170
- # [END delete ]
170
+ # [END dataproc_delete ]
171
171
172
172
173
- # [START wait ]
173
+ # [START dataproc_wait ]
174
174
def wait_for_job (dataproc , project , region , job_id ):
175
175
print ('Waiting for job to finish...' )
176
176
while True :
@@ -184,16 +184,16 @@ def wait_for_job(dataproc, project, region, job_id):
184
184
elif result ['status' ]['state' ] == 'DONE' :
185
185
print ('Job finished.' )
186
186
return result
187
- # [END wait ]
187
+ # [END dataproc_wait ]
188
188
189
189
190
- # [START get_client ]
190
+ # [START dataproc_get_client ]
191
191
def get_client ():
192
192
"""Builds an http client authenticated with the service account
193
193
credentials."""
194
194
dataproc = googleapiclient .discovery .build ('dataproc' , 'v1' )
195
195
return dataproc
196
- # [END get_client ]
196
+ # [END dataproc_get_client ]
197
197
198
198
199
199
def main (project_id , zone , cluster_name , bucket_name ,
@@ -221,11 +221,11 @@ def main(project_id, zone, cluster_name, bucket_name,
221
221
(cluster_id , output_bucket ) = (
222
222
get_cluster_id_by_name (cluster_list , cluster_name ))
223
223
224
- # [START call_submit_pyspark_job ]
224
+ # [START dataproc_call_submit_pyspark_job ]
225
225
job_id = submit_pyspark_job (
226
226
dataproc , project_id , region ,
227
227
cluster_name , bucket_name , spark_filename )
228
- # [END call_submit_pyspark_job ]
228
+ # [END dataproc_call_submit_pyspark_job ]
229
229
wait_for_job (dataproc , project_id , region , job_id )
230
230
231
231
output = download_output (project_id , cluster_id , output_bucket , job_id )
0 commit comments