Skip to content

Commit bd44acd

Browse files
anguillanneufpradn
authored andcommitted
Pub/Sub: add SubscriberClient.close() to examples [(#3118)](GoogleCloudPlatform/python-docs-samples#3118)
* Add SubscriberClient.close() to examples. Co-authored-by: Prad Nelluru <[email protected]> Co-authored-by: Prad Nelluru <[email protected]>
1 parent f2336e6 commit bd44acd

File tree

7 files changed

+75
-36
lines changed

7 files changed

+75
-36
lines changed

samples/snippets/iam.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ def get_subscription_policy(project, subscription_name):
5151
print("Policy for subscription {}:".format(subscription_path))
5252
for binding in policy.bindings:
5353
print("Role: {}, Members: {}".format(binding.role, binding.members))
54+
55+
client.close()
5456
# [END pubsub_get_subscription_policy]
5557

5658

@@ -101,6 +103,8 @@ def set_subscription_policy(project, subscription_name):
101103
subscription_name, policy
102104
)
103105
)
106+
107+
client.close()
104108
# [END pubsub_set_subscription_policy]
105109

106110

@@ -144,6 +148,8 @@ def check_subscription_permissions(project, subscription_name):
144148
subscription_path, allowed_permissions
145149
)
146150
)
151+
152+
client.close()
147153
# [END pubsub_test_subscription_permissions]
148154

149155

samples/snippets/iam_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ def topic(publisher_client):
4949

5050
@pytest.fixture(scope="module")
5151
def subscriber_client():
52-
yield pubsub_v1.SubscriberClient()
52+
subscriber_client = pubsub_v1.SubscriberClient()
53+
yield subscriber_client
54+
subscriber_client.close()
5355

5456

5557
@pytest.fixture

samples/snippets/quickstart/sub.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ def sub(project_id, subscription_name):
2727
"""Receives messages from a Pub/Sub subscription."""
2828
# [START pubsub_quickstart_sub_client]
2929
# Initialize a Subscriber client
30-
client = pubsub_v1.SubscriberClient()
30+
subscriber_client = pubsub_v1.SubscriberClient()
3131
# [END pubsub_quickstart_sub_client]
3232
# Create a fully qualified identifier in the form of
3333
# `projects/{project_id}/subscriptions/{subscription_name}`
34-
subscription_path = client.subscription_path(project_id, subscription_name)
34+
subscription_path = subscriber_client.subscription_path(
35+
project_id, subscription_name
36+
)
3537

3638
def callback(message):
3739
print(
@@ -43,18 +45,20 @@ def callback(message):
4345
message.ack()
4446
print("Acknowledged message {}\n".format(message.message_id))
4547

46-
streaming_pull_future = client.subscribe(
48+
streaming_pull_future = subscriber_client.subscribe(
4749
subscription_path, callback=callback
4850
)
4951
print("Listening for messages on {}..\n".format(subscription_path))
5052

51-
# Calling result() on StreamingPullFuture keeps the main thread from
52-
# exiting while messages get processed in the callbacks.
5353
try:
54+
# Calling result() on StreamingPullFuture keeps the main thread from
55+
# exiting while messages get processed in the callbacks.
5456
streaming_pull_future.result()
5557
except: # noqa
5658
streaming_pull_future.cancel()
5759

60+
subscriber_client.close()
61+
5862

5963
if __name__ == "__main__":
6064
parser = argparse.ArgumentParser(

samples/snippets/quickstart/sub_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def subscription_path(topic_path):
6262
yield subscription_path
6363

6464
subscriber_client.delete_subscription(subscription_path)
65+
subscriber_client.close()
6566

6667

6768
def _publish_messages(topic_path):
@@ -102,3 +103,5 @@ def mock_result():
102103
out, _ = capsys.readouterr()
103104
assert "Received message" in out
104105
assert "Acknowledged message" in out
106+
107+
real_client.close()

samples/snippets/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
google-cloud-pubsub==1.1.0
1+
google-cloud-pubsub==1.3.0

samples/snippets/subscriber.py

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ def list_subscriptions_in_project(project_id):
5252

5353
for subscription in subscriber.list_subscriptions(project_path):
5454
print(subscription.name)
55+
56+
subscriber.close()
5557
# [END pubsub_list_subscriptions]
5658

5759

@@ -75,6 +77,8 @@ def create_subscription(project_id, topic_name, subscription_name):
7577
)
7678

7779
print("Subscription created: {}".format(subscription))
80+
81+
subscriber.close()
7882
# [END pubsub_create_pull_subscription]
7983

8084

@@ -104,6 +108,8 @@ def create_push_subscription(
104108

105109
print("Push subscription created: {}".format(subscription))
106110
print("Endpoint for subscription is: {}".format(endpoint))
111+
112+
subscriber.close()
107113
# [END pubsub_create_push_subscription]
108114

109115

@@ -123,6 +129,8 @@ def delete_subscription(project_id, subscription_name):
123129
subscriber.delete_subscription(subscription_path)
124130

125131
print("Subscription deleted: {}".format(subscription_path))
132+
133+
subscriber.close()
126134
# [END pubsub_delete_subscription]
127135

128136

@@ -158,6 +166,8 @@ def update_subscription(project_id, subscription_name, endpoint):
158166

159167
print("Subscription updated: {}".format(subscription_path))
160168
print("New endpoint for subscription is: {}".format(result.push_config))
169+
170+
subscriber.close()
161171
# [END pubsub_update_push_configuration]
162172

163173

@@ -188,12 +198,14 @@ def callback(message):
188198
)
189199
print("Listening for messages on {}..\n".format(subscription_path))
190200

191-
# result() in a future will block indefinitely if `timeout` is not set,
192-
# unless an exception is encountered first.
193-
try:
194-
streaming_pull_future.result(timeout=timeout)
195-
except: # noqa
196-
streaming_pull_future.cancel()
201+
# Wrap subscriber in a 'with' block to automatically call close() when done.
202+
with subscriber:
203+
try:
204+
# When `timeout` is not set, result() will block indefinitely,
205+
# unless an exception is encountered first.
206+
streaming_pull_future.result(timeout=timeout)
207+
except: # noqa
208+
streaming_pull_future.cancel()
197209
# [END pubsub_subscriber_async_pull]
198210
# [END pubsub_quickstart_subscriber]
199211

@@ -230,12 +242,14 @@ def callback(message):
230242
)
231243
print("Listening for messages on {}..\n".format(subscription_path))
232244

233-
# result() in a future will block indefinitely if `timeout` is not set,
234-
# unless an exception is encountered first.
235-
try:
236-
streaming_pull_future.result(timeout=timeout)
237-
except: # noqa
238-
streaming_pull_future.cancel()
245+
# Wrap subscriber in a 'with' block to automatically call close() when done.
246+
with subscriber:
247+
try:
248+
# When `timeout` is not set, result() will block indefinitely,
249+
# unless an exception is encountered first.
250+
streaming_pull_future.result(timeout=timeout)
251+
except: # noqa
252+
streaming_pull_future.cancel()
239253
# [END pubsub_subscriber_async_pull_custom_attributes]
240254
# [END pubsub_subscriber_sync_pull_custom_attributes]
241255

@@ -269,12 +283,14 @@ def callback(message):
269283
)
270284
print("Listening for messages on {}..\n".format(subscription_path))
271285

272-
# result() in a future will block indefinitely if `timeout` is not set,
273-
# unless an exception is encountered first.
274-
try:
275-
streaming_pull_future.result(timeout=timeout)
276-
except: # noqa
277-
streaming_pull_future.cancel()
286+
# Wrap subscriber in a 'with' block to automatically call close() when done.
287+
with subscriber:
288+
try:
289+
# When `timeout` is not set, result() will block indefinitely,
290+
# unless an exception is encountered first.
291+
streaming_pull_future.result(timeout=timeout)
292+
except: # noqa
293+
streaming_pull_future.cancel()
278294
# [END pubsub_subscriber_flow_settings]
279295

280296

@@ -309,6 +325,8 @@ def synchronous_pull(project_id, subscription_name):
309325
len(response.received_messages)
310326
)
311327
)
328+
329+
subscriber.close()
312330
# [END pubsub_subscriber_sync_pull]
313331

314332

@@ -398,6 +416,8 @@ def worker(msg):
398416
len(response.received_messages)
399417
)
400418
)
419+
420+
subscriber.close()
401421
# [END pubsub_subscriber_sync_pull_with_lease]
402422

403423

@@ -425,17 +445,19 @@ def callback(message):
425445
)
426446
print("Listening for messages on {}..\n".format(subscription_path))
427447

428-
# result() in a future will block indefinitely if `timeout` is not set,
429-
# unless an exception is encountered first.
430-
try:
431-
streaming_pull_future.result(timeout=timeout)
432-
except Exception as e:
433-
streaming_pull_future.cancel()
434-
print(
435-
"Listening for messages on {} threw an exception: {}.".format(
436-
subscription_name, e
448+
# Wrap subscriber in a 'with' block to automatically call close() when done.
449+
with subscriber:
450+
# When `timeout` is not set, result() will block indefinitely,
451+
# unless an exception is encountered first.
452+
try:
453+
streaming_pull_future.result(timeout=timeout)
454+
except Exception as e:
455+
streaming_pull_future.cancel()
456+
print(
457+
"Listening for messages on {} threw an exception: {}.".format(
458+
subscription_name, e
459+
)
437460
)
438-
)
439461
# [END pubsub_subscriber_error_listener]
440462

441463

samples/snippets/subscriber_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ def topic(publisher_client):
5252

5353
@pytest.fixture(scope="module")
5454
def subscriber_client():
55-
yield pubsub_v1.SubscriberClient()
55+
subscriber_client = pubsub_v1.SubscriberClient()
56+
yield subscriber_client
57+
subscriber_client.close()
5658

5759

5860
@pytest.fixture(scope="module")

0 commit comments

Comments
 (0)