|
22 | 22 | """
|
23 | 23 |
|
24 | 24 | import argparse
|
| 25 | +import concurrent.futures |
25 | 26 |
|
26 | 27 | from google.cloud import pubsub_v1
|
27 | 28 |
|
@@ -123,6 +124,38 @@ def publish_messages_with_futures(project, topic_name):
|
123 | 124 | # [END pubsub_publisher_concurrency_control]
|
124 | 125 |
|
125 | 126 |
|
| 127 | +def publish_messages_with_error_handler(project, topic_name): |
| 128 | + """Publishes multiple messages to a Pub/Sub topic with an error handler.""" |
| 129 | + # [START pubsub_publish_messages_error_handler] |
| 130 | + publisher = pubsub_v1.PublisherClient() |
| 131 | + topic_path = publisher.topic_path(project, topic_name) |
| 132 | + |
| 133 | + # When you publish a message, the client returns a Future. This Future |
| 134 | + # can be used to track if an error has occurred. |
| 135 | + futures = [] |
| 136 | + |
| 137 | + def callback(f): |
| 138 | + exc = f.exception() |
| 139 | + if exc: |
| 140 | + print('Publishing message on {} threw an Exception {}.'.format( |
| 141 | + topic_name, exc)) |
| 142 | + |
| 143 | + for n in range(1, 10): |
| 144 | + data = u'Message number {}'.format(n) |
| 145 | + # Data must be a bytestring |
| 146 | + data = data.encode('utf-8') |
| 147 | + message_future = publisher.publish(topic_path, data=data) |
| 148 | + message_future.add_done_callback(callback) |
| 149 | + futures.append(message_future) |
| 150 | + |
| 151 | + # We must keep the main thread from exiting to allow it to process |
| 152 | + # messages in the background. |
| 153 | + concurrent.futures.wait(futures) |
| 154 | + |
| 155 | + print('Published messages.') |
| 156 | + # [END pubsub_publish_messages_error_handler] |
| 157 | + |
| 158 | + |
126 | 159 | def publish_messages_with_batch_settings(project, topic_name):
|
127 | 160 | """Publishes multiple messages to a Pub/Sub topic with batch settings."""
|
128 | 161 | # [START pubsub_publisher_batch_settings]
|
|
0 commit comments