19
19
from google .cloud import storage
20
20
import googleapiclient .discovery
21
21
22
- # Currently only the "global" region is supported
23
- REGION = 'global'
24
22
DEFAULT_FILENAME = 'pyspark_sort.py'
25
23
26
24
@@ -36,6 +34,14 @@ def get_pyspark_file(filename):
36
34
return f , os .path .basename (filename )
37
35
38
36
37
+ def get_region_from_zone (zone ):
38
+ try :
39
+ region_as_list = zone .split ('-' )[:- 1 ]
40
+ return '-' .join (region_as_list )
41
+ except (AttributeError , IndexError , ValueError ):
42
+ raise ValueError ('Invalid zone provided, please check your input.' )
43
+
44
+
39
45
def upload_pyspark_file (project_id , bucket_name , filename , file ):
40
46
"""Uploads the PySpark file in this directory to the configured
41
47
input bucket."""
@@ -59,8 +65,8 @@ def download_output(project_id, cluster_id, output_bucket, job_id):
59
65
60
66
61
67
# [START create_cluster]
62
- def create_cluster (dataproc , project , cluster_name , zone ):
63
- print ('Creating cluster.' )
68
+ def create_cluster (dataproc , project , zone , region , cluster_name ):
69
+ print ('Creating cluster... ' )
64
70
zone_uri = \
65
71
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}' .format (
66
72
project , zone )
@@ -75,19 +81,19 @@ def create_cluster(dataproc, project, cluster_name, zone):
75
81
}
76
82
result = dataproc .projects ().regions ().clusters ().create (
77
83
projectId = project ,
78
- region = REGION ,
84
+ region = region ,
79
85
body = cluster_data ).execute ()
80
86
return result
81
87
# [END create_cluster]
82
88
83
89
84
- def wait_for_cluster_creation (dataproc , project_id , cluster_name , zone ):
85
- print ('Waiting for cluster creation' )
90
+ def wait_for_cluster_creation (dataproc , project_id , region , cluster_name ):
91
+ print ('Waiting for cluster creation... ' )
86
92
87
93
while True :
88
94
result = dataproc .projects ().regions ().clusters ().list (
89
95
projectId = project_id ,
90
- region = REGION ).execute ()
96
+ region = region ).execute ()
91
97
cluster_list = result ['clusters' ]
92
98
cluster = [c
93
99
for c in cluster_list
@@ -100,10 +106,10 @@ def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
100
106
101
107
102
108
# [START list_clusters_with_detail]
103
- def list_clusters_with_details (dataproc , project ):
109
+ def list_clusters_with_details (dataproc , project , region ):
104
110
result = dataproc .projects ().regions ().clusters ().list (
105
111
projectId = project ,
106
- region = REGION ).execute ()
112
+ region = region ).execute ()
107
113
cluster_list = result ['clusters' ]
108
114
for cluster in cluster_list :
109
115
print ("{} - {}"
@@ -120,7 +126,8 @@ def get_cluster_id_by_name(cluster_list, cluster_name):
120
126
121
127
122
128
# [START submit_pyspark_job]
123
- def submit_pyspark_job (dataproc , project , cluster_name , bucket_name , filename ):
129
+ def submit_pyspark_job (dataproc , project , region ,
130
+ cluster_name , bucket_name , filename ):
124
131
"""Submits the Pyspark job to the cluster, assuming `filename` has
125
132
already been uploaded to `bucket_name`"""
126
133
job_details = {
@@ -136,7 +143,7 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
136
143
}
137
144
result = dataproc .projects ().regions ().jobs ().submit (
138
145
projectId = project ,
139
- region = REGION ,
146
+ region = region ,
140
147
body = job_details ).execute ()
141
148
job_id = result ['reference' ]['jobId' ]
142
149
print ('Submitted job ID {}' .format (job_id ))
@@ -145,29 +152,29 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
145
152
146
153
147
154
# [START delete]
148
- def delete_cluster (dataproc , project , cluster ):
155
+ def delete_cluster (dataproc , project , region , cluster ):
149
156
print ('Tearing down cluster' )
150
157
result = dataproc .projects ().regions ().clusters ().delete (
151
158
projectId = project ,
152
- region = REGION ,
159
+ region = region ,
153
160
clusterName = cluster ).execute ()
154
161
return result
155
162
# [END delete]
156
163
157
164
158
165
# [START wait]
159
- def wait_for_job (dataproc , project , job_id ):
166
+ def wait_for_job (dataproc , project , region , job_id ):
160
167
print ('Waiting for job to finish...' )
161
168
while True :
162
169
result = dataproc .projects ().regions ().jobs ().get (
163
170
projectId = project ,
164
- region = REGION ,
171
+ region = region ,
165
172
jobId = job_id ).execute ()
166
173
# Handle exceptions
167
174
if result ['status' ]['state' ] == 'ERROR' :
168
175
raise Exception (result ['status' ]['details' ])
169
176
elif result ['status' ]['state' ] == 'DONE' :
170
- print ('Job finished' )
177
+ print ('Job finished. ' )
171
178
return result
172
179
# [END wait]
173
180
@@ -181,34 +188,44 @@ def get_client():
181
188
# [END get_client]
182
189
183
190
184
- def main (project_id , zone , cluster_name , bucket_name , pyspark_file = None ):
191
+ def main (project_id , zone , cluster_name , bucket_name ,
192
+ pyspark_file = None , create_new_cluster = True ):
185
193
dataproc = get_client ()
194
+ region = get_region_from_zone (zone )
186
195
try :
187
196
if pyspark_file :
188
197
spark_file , spark_filename = get_pyspark_file (pyspark_file )
189
198
else :
190
199
spark_file , spark_filename = get_default_pyspark_file ()
191
200
192
- create_cluster (dataproc , project_id , cluster_name , zone )
193
- wait_for_cluster_creation (dataproc , project_id , cluster_name , zone )
194
- upload_pyspark_file (project_id , bucket_name ,
195
- spark_filename , spark_file )
201
+ if create_new_cluster :
202
+ create_cluster (
203
+ dataproc , project_id , zone , region , cluster_name )
204
+ wait_for_cluster_creation (
205
+ dataproc , project_id , region , cluster_name )
206
+
207
+ upload_pyspark_file (
208
+ project_id , bucket_name , spark_filename , spark_file )
209
+
196
210
cluster_list = list_clusters_with_details (
197
- dataproc , project_id )['clusters' ]
211
+ dataproc , project_id , region )['clusters' ]
198
212
199
213
(cluster_id , output_bucket ) = (
200
214
get_cluster_id_by_name (cluster_list , cluster_name ))
215
+
201
216
# [START call_submit_pyspark_job]
202
217
job_id = submit_pyspark_job (
203
- dataproc , project_id , cluster_name , bucket_name , spark_filename )
218
+ dataproc , project_id , region ,
219
+ cluster_name , bucket_name , spark_filename )
204
220
# [END call_submit_pyspark_job]
205
- wait_for_job (dataproc , project_id , job_id )
221
+ wait_for_job (dataproc , project_id , region , job_id )
206
222
207
223
output = download_output (project_id , cluster_id , output_bucket , job_id )
208
224
print ('Received job output {}' .format (output ))
209
225
return output
210
226
finally :
211
- delete_cluster (dataproc , project_id , cluster_name )
227
+ if create_new_cluster :
228
+ delete_cluster (dataproc , project_id , region , cluster_name )
212
229
spark_file .close ()
213
230
214
231
@@ -220,15 +237,19 @@ def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
220
237
parser .add_argument (
221
238
'--project_id' , help = 'Project ID you want to access.' , required = True ),
222
239
parser .add_argument (
223
- '--zone' , help = 'Region to create clusters in' , required = True )
240
+ '--zone' , help = 'Zone to create clusters in/connect to ' , required = True )
224
241
parser .add_argument (
225
- '--cluster_name' , help = 'Name of the cluster to create' , required = True )
242
+ '--cluster_name' ,
243
+ help = 'Name of the cluster to create/connect to' , required = True )
226
244
parser .add_argument (
227
245
'--gcs_bucket' , help = 'Bucket to upload Pyspark file to' , required = True )
228
246
parser .add_argument (
229
247
'--pyspark_file' , help = 'Pyspark filename. Defaults to pyspark_sort.py' )
248
+ parser .add_argument (
249
+ '--create_new_cluster' ,
250
+ action = 'store_true' , help = 'States if the cluster should be created' )
230
251
231
252
args = parser .parse_args ()
232
253
main (
233
- args .project_id , args .zone ,
234
- args .cluster_name , args .gcs_bucket , args .pyspark_file )
254
+ args .project_id , args .zone , args . cluster_name ,
255
+ args .gcs_bucket , args .pyspark_file , args .create_new_cluster )
0 commit comments