Skip to content

Commit fef62a3

Browse files
authored
Merge branch 'master' into vision-v1p3beta1-logo-delet-old-beta-snippets
2 parents f24b662 + 373789e commit fef62a3

9 files changed

+204
-36
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This sample walks a user through instantiating an inline
16+
# workflow for Cloud Dataproc using the Python client library.
17+
#
18+
# This script can be run on its own:
19+
# python workflows.py ${PROJECT_ID} ${REGION}
20+
21+
import sys
22+
# [START dataproc_instantiate_inline_workflow_template]
23+
from google.cloud import dataproc_v1 as dataproc
24+
25+
26+
def instantiate_inline_workflow_template(project_id, region):
27+
"""This sample walks a user through submitting a workflow
28+
for a Cloud Dataproc using the Python client library.
29+
30+
Args:
31+
project_id (string): Project to use for running the workflow.
32+
region (string): Region where the workflow resources should live.
33+
"""
34+
35+
# Create a client with the endpoint set to the desired region.
36+
workflow_template_client = dataproc.WorkflowTemplateServiceClient(
37+
client_options={
38+
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)}
39+
)
40+
41+
parent = workflow_template_client.region_path(project_id, region)
42+
43+
template = {
44+
'jobs': [
45+
{
46+
'hadoop_job': {
47+
'main_jar_file_uri': 'file:///usr/lib/hadoop-mapreduce/'
48+
'hadoop-mapreduce-examples.jar',
49+
'args': [
50+
'teragen',
51+
'1000',
52+
'hdfs:///gen/'
53+
]
54+
},
55+
'step_id': 'teragen'
56+
},
57+
{
58+
'hadoop_job': {
59+
'main_jar_file_uri': 'file:///usr/lib/hadoop-mapreduce/'
60+
'hadoop-mapreduce-examples.jar',
61+
'args': [
62+
'terasort',
63+
'hdfs:///gen/',
64+
'hdfs:///sort/'
65+
]
66+
},
67+
'step_id': 'terasort',
68+
'prerequisite_step_ids': [
69+
'teragen'
70+
]
71+
}],
72+
'placement': {
73+
'managed_cluster': {
74+
'cluster_name': 'my-managed-cluster',
75+
'config': {
76+
'gce_cluster_config': {
77+
# Leave 'zone_uri' empty for 'Auto Zone Placement'
78+
# 'zone_uri': ''
79+
'zone_uri': 'us-central1-a'
80+
}
81+
}
82+
}
83+
}
84+
}
85+
86+
# Submit the request to instantiate the workflow from an inline template.
87+
operation = workflow_template_client.instantiate_inline_workflow_template(
88+
parent, template
89+
)
90+
operation.result()
91+
92+
# Output a success message.
93+
print('Workflow ran successfully.')
94+
# [END dataproc_instantiate_inline_workflow_template]
95+
96+
97+
if __name__ == "__main__":
98+
instantiate_inline_workflow_template(sys.argv[1], sys.argv[2])
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
17+
import instantiate_inline_workflow_template
18+
19+
20+
PROJECT_ID = os.environ['GCLOUD_PROJECT']
21+
REGION = 'us-central1'
22+
23+
24+
def test_workflows(capsys):
25+
# Wrapper function for client library function
26+
instantiate_inline_workflow_template.instantiate_inline_workflow_template(
27+
PROJECT_ID, REGION
28+
)
29+
30+
out, _ = capsys.readouterr()
31+
assert "successfully" in out

pubsub/cloud-client/iam.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ def get_subscription_policy(project, subscription_name):
5151
print("Policy for subscription {}:".format(subscription_path))
5252
for binding in policy.bindings:
5353
print("Role: {}, Members: {}".format(binding.role, binding.members))
54+
55+
client.close()
5456
# [END pubsub_get_subscription_policy]
5557

5658

@@ -101,6 +103,8 @@ def set_subscription_policy(project, subscription_name):
101103
subscription_name, policy
102104
)
103105
)
106+
107+
client.close()
104108
# [END pubsub_set_subscription_policy]
105109

106110

@@ -144,6 +148,8 @@ def check_subscription_permissions(project, subscription_name):
144148
subscription_path, allowed_permissions
145149
)
146150
)
151+
152+
client.close()
147153
# [END pubsub_test_subscription_permissions]
148154

149155

pubsub/cloud-client/iam_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ def topic(publisher_client):
4949

5050
@pytest.fixture(scope="module")
5151
def subscriber_client():
52-
yield pubsub_v1.SubscriberClient()
52+
subscriber_client = pubsub_v1.SubscriberClient()
53+
yield subscriber_client
54+
subscriber_client.close()
5355

5456

5557
@pytest.fixture

pubsub/cloud-client/quickstart/sub.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ def sub(project_id, subscription_name):
2727
"""Receives messages from a Pub/Sub subscription."""
2828
# [START pubsub_quickstart_sub_client]
2929
# Initialize a Subscriber client
30-
client = pubsub_v1.SubscriberClient()
30+
subscriber_client = pubsub_v1.SubscriberClient()
3131
# [END pubsub_quickstart_sub_client]
3232
# Create a fully qualified identifier in the form of
3333
# `projects/{project_id}/subscriptions/{subscription_name}`
34-
subscription_path = client.subscription_path(project_id, subscription_name)
34+
subscription_path = subscriber_client.subscription_path(
35+
project_id, subscription_name
36+
)
3537

3638
def callback(message):
3739
print(
@@ -43,18 +45,20 @@ def callback(message):
4345
message.ack()
4446
print("Acknowledged message {}\n".format(message.message_id))
4547

46-
streaming_pull_future = client.subscribe(
48+
streaming_pull_future = subscriber_client.subscribe(
4749
subscription_path, callback=callback
4850
)
4951
print("Listening for messages on {}..\n".format(subscription_path))
5052

51-
# Calling result() on StreamingPullFuture keeps the main thread from
52-
# exiting while messages get processed in the callbacks.
5353
try:
54+
# Calling result() on StreamingPullFuture keeps the main thread from
55+
# exiting while messages get processed in the callbacks.
5456
streaming_pull_future.result()
5557
except: # noqa
5658
streaming_pull_future.cancel()
5759

60+
subscriber_client.close()
61+
5862

5963
if __name__ == "__main__":
6064
parser = argparse.ArgumentParser(

pubsub/cloud-client/quickstart/sub_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def subscription_path(topic_path):
6262
yield subscription_path
6363

6464
subscriber_client.delete_subscription(subscription_path)
65+
subscriber_client.close()
6566

6667

6768
def _publish_messages(topic_path):
@@ -102,3 +103,5 @@ def mock_result():
102103
out, _ = capsys.readouterr()
103104
assert "Received message" in out
104105
assert "Acknowledged message" in out
106+
107+
real_client.close()

pubsub/cloud-client/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
google-cloud-pubsub==1.1.0
1+
google-cloud-pubsub==1.3.0

pubsub/cloud-client/subscriber.py

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ def list_subscriptions_in_project(project_id):
5252

5353
for subscription in subscriber.list_subscriptions(project_path):
5454
print(subscription.name)
55+
56+
subscriber.close()
5557
# [END pubsub_list_subscriptions]
5658

5759

@@ -75,6 +77,8 @@ def create_subscription(project_id, topic_name, subscription_name):
7577
)
7678

7779
print("Subscription created: {}".format(subscription))
80+
81+
subscriber.close()
7882
# [END pubsub_create_pull_subscription]
7983

8084

@@ -104,6 +108,8 @@ def create_push_subscription(
104108

105109
print("Push subscription created: {}".format(subscription))
106110
print("Endpoint for subscription is: {}".format(endpoint))
111+
112+
subscriber.close()
107113
# [END pubsub_create_push_subscription]
108114

109115

@@ -123,6 +129,8 @@ def delete_subscription(project_id, subscription_name):
123129
subscriber.delete_subscription(subscription_path)
124130

125131
print("Subscription deleted: {}".format(subscription_path))
132+
133+
subscriber.close()
126134
# [END pubsub_delete_subscription]
127135

128136

@@ -158,6 +166,8 @@ def update_subscription(project_id, subscription_name, endpoint):
158166

159167
print("Subscription updated: {}".format(subscription_path))
160168
print("New endpoint for subscription is: {}".format(result.push_config))
169+
170+
subscriber.close()
161171
# [END pubsub_update_push_configuration]
162172

163173

@@ -188,12 +198,14 @@ def callback(message):
188198
)
189199
print("Listening for messages on {}..\n".format(subscription_path))
190200

191-
# result() in a future will block indefinitely if `timeout` is not set,
192-
# unless an exception is encountered first.
193-
try:
194-
streaming_pull_future.result(timeout=timeout)
195-
except: # noqa
196-
streaming_pull_future.cancel()
201+
# Wrap subscriber in a 'with' block to automatically call close() when done.
202+
with subscriber:
203+
try:
204+
# When `timeout` is not set, result() will block indefinitely,
205+
# unless an exception is encountered first.
206+
streaming_pull_future.result(timeout=timeout)
207+
except: # noqa
208+
streaming_pull_future.cancel()
197209
# [END pubsub_subscriber_async_pull]
198210
# [END pubsub_quickstart_subscriber]
199211

@@ -230,12 +242,14 @@ def callback(message):
230242
)
231243
print("Listening for messages on {}..\n".format(subscription_path))
232244

233-
# result() in a future will block indefinitely if `timeout` is not set,
234-
# unless an exception is encountered first.
235-
try:
236-
streaming_pull_future.result(timeout=timeout)
237-
except: # noqa
238-
streaming_pull_future.cancel()
245+
# Wrap subscriber in a 'with' block to automatically call close() when done.
246+
with subscriber:
247+
try:
248+
# When `timeout` is not set, result() will block indefinitely,
249+
# unless an exception is encountered first.
250+
streaming_pull_future.result(timeout=timeout)
251+
except: # noqa
252+
streaming_pull_future.cancel()
239253
# [END pubsub_subscriber_async_pull_custom_attributes]
240254
# [END pubsub_subscriber_sync_pull_custom_attributes]
241255

@@ -269,12 +283,14 @@ def callback(message):
269283
)
270284
print("Listening for messages on {}..\n".format(subscription_path))
271285

272-
# result() in a future will block indefinitely if `timeout` is not set,
273-
# unless an exception is encountered first.
274-
try:
275-
streaming_pull_future.result(timeout=timeout)
276-
except: # noqa
277-
streaming_pull_future.cancel()
286+
# Wrap subscriber in a 'with' block to automatically call close() when done.
287+
with subscriber:
288+
try:
289+
# When `timeout` is not set, result() will block indefinitely,
290+
# unless an exception is encountered first.
291+
streaming_pull_future.result(timeout=timeout)
292+
except: # noqa
293+
streaming_pull_future.cancel()
278294
# [END pubsub_subscriber_flow_settings]
279295

280296

@@ -309,6 +325,8 @@ def synchronous_pull(project_id, subscription_name):
309325
len(response.received_messages)
310326
)
311327
)
328+
329+
subscriber.close()
312330
# [END pubsub_subscriber_sync_pull]
313331

314332

@@ -398,6 +416,8 @@ def worker(msg):
398416
len(response.received_messages)
399417
)
400418
)
419+
420+
subscriber.close()
401421
# [END pubsub_subscriber_sync_pull_with_lease]
402422

403423

@@ -425,17 +445,19 @@ def callback(message):
425445
)
426446
print("Listening for messages on {}..\n".format(subscription_path))
427447

428-
# result() in a future will block indefinitely if `timeout` is not set,
429-
# unless an exception is encountered first.
430-
try:
431-
streaming_pull_future.result(timeout=timeout)
432-
except Exception as e:
433-
streaming_pull_future.cancel()
434-
print(
435-
"Listening for messages on {} threw an exception: {}.".format(
436-
subscription_name, e
448+
# Wrap subscriber in a 'with' block to automatically call close() when done.
449+
with subscriber:
450+
# When `timeout` is not set, result() will block indefinitely,
451+
# unless an exception is encountered first.
452+
try:
453+
streaming_pull_future.result(timeout=timeout)
454+
except Exception as e:
455+
streaming_pull_future.cancel()
456+
print(
457+
"Listening for messages on {} threw an exception: {}.".format(
458+
subscription_name, e
459+
)
437460
)
438-
)
439461
# [END pubsub_subscriber_error_listener]
440462

441463

pubsub/cloud-client/subscriber_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ def topic(publisher_client):
5252

5353
@pytest.fixture(scope="module")
5454
def subscriber_client():
55-
yield pubsub_v1.SubscriberClient()
55+
subscriber_client = pubsub_v1.SubscriberClient()
56+
yield subscriber_client
57+
subscriber_client.close()
5658

5759

5860
@pytest.fixture(scope="module")

0 commit comments

Comments
 (0)