Skip to content

Commit 162b852

Browse files
feat!: migrate to use microgen (#71)
* feat!: migrate to use microgen * update * update * update * update
1 parent 4c4563b commit 162b852

12 files changed

+350
-299
lines changed

dataproc/snippets/create_cluster.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,38 +38,34 @@ def create_cluster(project_id, region, cluster_name):
3838
"""
3939

4040
# Create a client with the endpoint set to the desired cluster region.
41-
cluster_client = dataproc.ClusterControllerClient(client_options={
42-
'api_endpoint': f'{region}-dataproc.googleapis.com:443',
43-
})
41+
cluster_client = dataproc.ClusterControllerClient(
42+
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
43+
)
4444

4545
# Create the cluster config.
4646
cluster = {
47-
'project_id': project_id,
48-
'cluster_name': cluster_name,
49-
'config': {
50-
'master_config': {
51-
'num_instances': 1,
52-
'machine_type_uri': 'n1-standard-1'
53-
},
54-
'worker_config': {
55-
'num_instances': 2,
56-
'machine_type_uri': 'n1-standard-1'
57-
}
58-
}
47+
"project_id": project_id,
48+
"cluster_name": cluster_name,
49+
"config": {
50+
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-1"},
51+
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-1"},
52+
},
5953
}
6054

6155
# Create the cluster.
62-
operation = cluster_client.create_cluster(project_id, region, cluster)
56+
operation = cluster_client.create_cluster(
57+
request={"project_id": project_id, "region": region, "cluster": cluster}
58+
)
6359
result = operation.result()
6460

6561
# Output a success message.
66-
print(f'Cluster created successfully: {result.cluster_name}')
62+
print(f"Cluster created successfully: {result.cluster_name}")
6763
# [END dataproc_create_cluster]
6864

6965

7066
if __name__ == "__main__":
7167
if len(sys.argv) < 4:
72-
sys.exit('python create_cluster.py project_id region cluster_name')
68+
sys.exit("python create_cluster.py project_id region cluster_name")
7369

7470
project_id = sys.argv[1]
7571
region = sys.argv[2]

dataproc/snippets/create_cluster_test.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,26 @@
2121
import create_cluster
2222

2323

24-
PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT']
25-
REGION = 'us-central1'
26-
CLUSTER_NAME = 'py-cc-test-{}'.format(str(uuid.uuid4()))
24+
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
25+
REGION = "us-central1"
26+
CLUSTER_NAME = "py-cc-test-{}".format(str(uuid.uuid4()))
2727

2828

2929
@pytest.fixture(autouse=True)
3030
def teardown():
3131
yield
3232

33-
cluster_client = dataproc.ClusterControllerClient(client_options={
34-
'api_endpoint': f'{REGION}-dataproc.googleapis.com:443'
35-
})
33+
cluster_client = dataproc.ClusterControllerClient(
34+
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
35+
)
3636
# Client library function
37-
operation = cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
37+
operation = cluster_client.delete_cluster(
38+
request={
39+
"project_id": PROJECT_ID,
40+
"region": REGION,
41+
"cluster_name": CLUSTER_NAME,
42+
}
43+
)
3844
# Wait for cluster to delete
3945
operation.result()
4046

dataproc/snippets/dataproc_e2e_donttest.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020

2121
import submit_job_to_cluster
2222

23-
PROJECT = os.environ['GOOGLE_CLOUD_PROJECT']
24-
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
25-
CLUSTER_NAME = 'testcluster3'
26-
ZONE = 'us-central1-b'
23+
PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"]
24+
BUCKET = os.environ["CLOUD_STORAGE_BUCKET"]
25+
CLUSTER_NAME = "testcluster3"
26+
ZONE = "us-central1-b"
2727

2828

2929
def test_e2e():
30-
output = submit_job_to_cluster.main(
31-
PROJECT, ZONE, CLUSTER_NAME, BUCKET)
30+
output = submit_job_to_cluster.main(PROJECT, ZONE, CLUSTER_NAME, BUCKET)
3231
assert b"['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output

dataproc/snippets/instantiate_inline_workflow_template.py

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -36,71 +36,61 @@ def instantiate_inline_workflow_template(project_id, region):
3636

3737
# Create a client with the endpoint set to the desired region.
3838
workflow_template_client = dataproc.WorkflowTemplateServiceClient(
39-
client_options={
40-
'api_endpoint': f'{region}-dataproc.googleapis.com:443'
41-
}
39+
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
4240
)
4341

44-
parent = workflow_template_client.region_path(project_id, region)
42+
parent = "projects/{}/regions/{}".format(project_id, region)
4543

4644
template = {
47-
'jobs': [
45+
"jobs": [
4846
{
49-
'hadoop_job': {
50-
'main_jar_file_uri': 'file:///usr/lib/hadoop-mapreduce/'
51-
'hadoop-mapreduce-examples.jar',
52-
'args': [
53-
'teragen',
54-
'1000',
55-
'hdfs:///gen/'
56-
]
47+
"hadoop_job": {
48+
"main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/"
49+
"hadoop-mapreduce-examples.jar",
50+
"args": ["teragen", "1000", "hdfs:///gen/"],
5751
},
58-
'step_id': 'teragen'
52+
"step_id": "teragen",
5953
},
6054
{
61-
'hadoop_job': {
62-
'main_jar_file_uri': 'file:///usr/lib/hadoop-mapreduce/'
63-
'hadoop-mapreduce-examples.jar',
64-
'args': [
65-
'terasort',
66-
'hdfs:///gen/',
67-
'hdfs:///sort/'
68-
]
55+
"hadoop_job": {
56+
"main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/"
57+
"hadoop-mapreduce-examples.jar",
58+
"args": ["terasort", "hdfs:///gen/", "hdfs:///sort/"],
6959
},
70-
'step_id': 'terasort',
71-
'prerequisite_step_ids': [
72-
'teragen'
73-
]
74-
}],
75-
'placement': {
76-
'managed_cluster': {
77-
'cluster_name': 'my-managed-cluster',
78-
'config': {
79-
'gce_cluster_config': {
60+
"step_id": "terasort",
61+
"prerequisite_step_ids": ["teragen"],
62+
},
63+
],
64+
"placement": {
65+
"managed_cluster": {
66+
"cluster_name": "my-managed-cluster",
67+
"config": {
68+
"gce_cluster_config": {
8069
# Leave 'zone_uri' empty for 'Auto Zone Placement'
8170
# 'zone_uri': ''
82-
'zone_uri': 'us-central1-a'
71+
"zone_uri": "us-central1-a"
8372
}
84-
}
73+
},
8574
}
86-
}
75+
},
8776
}
8877

8978
# Submit the request to instantiate the workflow from an inline template.
9079
operation = workflow_template_client.instantiate_inline_workflow_template(
91-
parent, template
80+
request={"parent": parent, "template": template}
9281
)
9382
operation.result()
9483

9584
# Output a success message.
96-
print('Workflow ran successfully.')
85+
print("Workflow ran successfully.")
9786
# [END dataproc_instantiate_inline_workflow_template]
9887

9988

10089
if __name__ == "__main__":
10190
if len(sys.argv) < 3:
102-
sys.exit('python instantiate_inline_workflow_template.py '
103-
+ 'project_id region')
91+
sys.exit(
92+
"python instantiate_inline_workflow_template.py " + "project_id region"
93+
)
10494

10595
project_id = sys.argv[1]
10696
region = sys.argv[2]

dataproc/snippets/instantiate_inline_workflow_template_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import instantiate_inline_workflow_template
1818

1919

20-
PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT']
21-
REGION = 'us-central1'
20+
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
21+
REGION = "us-central1"
2222

2323

2424
def test_workflows(capsys):

dataproc/snippets/list_clusters.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,45 +19,50 @@
1919
import argparse
2020

2121
from google.cloud import dataproc_v1
22-
from google.cloud.dataproc_v1.gapic.transports import (
23-
cluster_controller_grpc_transport)
22+
from google.cloud.dataproc_v1.gapic.transports import cluster_controller_grpc_transport
2423

2524

2625
# [START dataproc_list_clusters]
2726
def list_clusters(dataproc, project, region):
2827
"""List the details of clusters in the region."""
29-
for cluster in dataproc.list_clusters(project, region):
30-
print(('{} - {}'.format(cluster.cluster_name,
31-
cluster.status.State.Name(
32-
cluster.status.state))))
28+
for cluster in dataproc.list_clusters(
29+
request={"project_id": project, "region": region}
30+
):
31+
print(
32+
(
33+
"{} - {}".format(
34+
cluster.cluster_name,
35+
cluster.status.State.Name(cluster.status.state),
36+
)
37+
)
38+
)
39+
40+
3341
# [END dataproc_list_clusters]
3442

3543

3644
def main(project_id, region):
3745

38-
if region == 'global':
46+
if region == "global":
3947
# Use the default gRPC global endpoints.
4048
dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
4149
else:
4250
# Use a regional gRPC endpoint. See:
4351
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
44-
client_transport = (
45-
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
46-
address='{}-dataproc.googleapis.com:443'.format(region)))
47-
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
48-
client_transport)
52+
client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
53+
address="{}-dataproc.googleapis.com:443".format(region)
54+
)
55+
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(client_transport)
4956

5057
list_clusters(dataproc_cluster_client, project_id, region)
5158

5259

53-
if __name__ == '__main__':
60+
if __name__ == "__main__":
5461
parser = argparse.ArgumentParser(
55-
description=__doc__, formatter_class=(
56-
argparse.RawDescriptionHelpFormatter))
57-
parser.add_argument(
58-
'--project_id', help='Project ID to access.', required=True)
59-
parser.add_argument(
60-
'--region', help='Region of clusters to list.', required=True)
62+
description=__doc__, formatter_class=(argparse.RawDescriptionHelpFormatter)
63+
)
64+
parser.add_argument("--project_id", help="Project ID to access.", required=True)
65+
parser.add_argument("--region", help="Region of clusters to list.", required=True)
6166

6267
args = parser.parse_args()
6368
main(args.project_id, args.region)

dataproc/snippets/pyspark_sort.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import pyspark
2323

2424
sc = pyspark.SparkContext()
25-
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
25+
rdd = sc.parallelize(["Hello,", "world!", "dog", "elephant", "panther"])
2626
words = sorted(rdd.collect())
2727
print(words)
2828
# [END dataproc_pyspark_sort]

dataproc/snippets/pyspark_sort_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@
2525
import pyspark
2626

2727
sc = pyspark.SparkContext()
28-
rdd = sc.textFile('gs://path-to-your-GCS-file')
28+
rdd = sc.textFile("gs://path-to-your-GCS-file")
2929
print(sorted(rdd.collect()))
3030
# [END dataproc_pyspark_sort_gcs]

0 commit comments

Comments
 (0)