Skip to content

Commit 2b6e80f

Browse files
anguillanneufchenyumic
authored and
chenyumic
committed
Modified publisher with error handling [(#1568)](GoogleCloudPlatform/python-docs-samples#1568)
1 parent 43200bf commit 2b6e80f

File tree

3 files changed

+25
-18
lines changed

3 files changed

+25
-18
lines changed

samples/snippets/publisher.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"""
2323

2424
import argparse
25-
import concurrent.futures
25+
import time
2626

2727
from google.cloud import pubsub_v1
2828

@@ -130,29 +130,27 @@ def publish_messages_with_error_handler(project, topic_name):
130130
publisher = pubsub_v1.PublisherClient()
131131
topic_path = publisher.topic_path(project, topic_name)
132132

133-
# When you publish a message, the client returns a Future. This Future
134-
# can be used to track if an error has occurred.
135-
futures = []
136-
137-
def callback(f):
138-
exc = f.exception()
139-
if exc:
133+
def callback(message_future):
134+
if message_future.exception():
140135
print('Publishing message on {} threw an Exception {}.'.format(
141-
topic_name, exc))
136+
topic_name, message_future.exception()))
137+
else:
138+
print(message_future.result())
142139

143140
for n in range(1, 10):
144141
data = u'Message number {}'.format(n)
145142
# Data must be a bytestring
146143
data = data.encode('utf-8')
144+
# When you publish a message, the client returns a Future.
147145
message_future = publisher.publish(topic_path, data=data)
148146
message_future.add_done_callback(callback)
149-
futures.append(message_future)
147+
148+
print('Published message IDs:')
150149

151150
# We must keep the main thread from exiting to allow it to process
152151
# messages in the background.
153-
concurrent.futures.wait(futures)
154-
155-
print('Published messages.')
152+
while True:
153+
time.sleep(60)
156154
# [END pubsub_publish_messages_error_handler]
157155

158156

@@ -208,6 +206,11 @@ def publish_messages_with_batch_settings(project, topic_name):
208206
help=publish_messages_with_futures.__doc__)
209207
publish_with_futures_parser.add_argument('topic_name')
210208

209+
publish_with_error_handler_parser = subparsers.add_parser(
210+
'publish-with-error-handler',
211+
help=publish_messages_with_error_handler.__doc__)
212+
publish_with_error_handler_parser.add_argument('topic_name')
213+
211214
publish_with_batch_settings_parser = subparsers.add_parser(
212215
'publish-with-batch-settings',
213216
help=publish_messages_with_batch_settings.__doc__)
@@ -227,5 +230,7 @@ def publish_messages_with_batch_settings(project, topic_name):
227230
publish_messages_with_custom_attributes(args.project, args.topic_name)
228231
elif args.command == 'publish-with-futures':
229232
publish_messages_with_futures(args.project, args.topic_name)
233+
elif args.command == 'publish-with-error-handler':
234+
publish_messages_with_error_handler(args.project, args.topic_name)
230235
elif args.command == 'publish-with-batch-settings':
231236
publish_messages_with_batch_settings(args.project, args.topic_name)

samples/snippets/publisher_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ def test_publish_with_batch_settings(topic, capsys):
9595
assert 'Published' in out
9696

9797

98+
def test_publish_with_error_handler(topic, capsys):
99+
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)
100+
101+
out, _ = capsys.readouterr()
102+
assert 'Published' in out
103+
104+
98105
def test_publish_with_futures(topic, capsys):
99106
publisher.publish_messages_with_futures(PROJECT, TOPIC)
100107

samples/snippets/requirements.txt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1 @@
1-
<<<<<<< HEAD
21
google-cloud-pubsub==0.33.0
3-
=======
4-
google-cloud-pubsub==0.32.1
5-
futures==3.1.1; python_version < '3'
6-
>>>>>>> master

0 commit comments

Comments
 (0)