Skip to content

Commit 2d2820c

Browse files
Resolve all futures (#2231)
1 parent b32f7df commit 2d2820c

File tree

3 files changed

+42
-42
lines changed

3 files changed

+42
-42
lines changed

pubsub/cloud-client/publisher.py

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def publish_messages(project_id, topic_name):
9595
data = data.encode('utf-8')
9696
# When you publish a message, the client returns a future.
9797
future = publisher.publish(topic_path, data=data)
98-
print('Published {} of message ID {}.'.format(data, future.result()))
98+
print(future.result())
9999

100100
print('Published messages.')
101101
# [END pubsub_quickstart_publisher]
@@ -119,8 +119,9 @@ def publish_messages_with_custom_attributes(project_id, topic_name):
119119
# Data must be a bytestring
120120
data = data.encode('utf-8')
121121
# Add two attributes, origin and username, to the message
122-
publisher.publish(
122+
future = publisher.publish(
123123
topic_path, data, origin='python-sample', username='gcp')
124+
print(future.result())
124125

125126
print('Published messages with custom attributes.')
126127
# [END pubsub_publish_custom_attributes]
@@ -138,21 +139,15 @@ def publish_messages_with_futures(project_id, topic_name):
138139
publisher = pubsub_v1.PublisherClient()
139140
topic_path = publisher.topic_path(project_id, topic_name)
140141

141-
# When you publish a message, the client returns a Future. This Future
142-
# can be used to track when the message is published.
143-
futures = []
144-
145142
for n in range(1, 10):
146143
data = u'Message number {}'.format(n)
147144
# Data must be a bytestring
148145
data = data.encode('utf-8')
149-
message_future = publisher.publish(topic_path, data=data)
150-
futures.append(message_future)
151-
152-
print('Published message IDs:')
153-
for future in futures:
154-
# result() blocks until the message is published.
146+
# When you publish a message, the client returns a future.
147+
future = publisher.publish(topic_path, data=data)
155148
print(future.result())
149+
150+
print("Published messages with futures.")
156151
# [END pubsub_publisher_concurrency_control]
157152

158153

@@ -169,28 +164,34 @@ def publish_messages_with_error_handler(project_id, topic_name):
169164
publisher = pubsub_v1.PublisherClient()
170165
topic_path = publisher.topic_path(project_id, topic_name)
171166

172-
def callback(message_future):
173-
if message_future.exception():
174-
print('{} needs handling.'.format(message_future.exception()))
175-
else:
176-
print(message_future.result())
167+
futures = dict()
177168

178-
for n in range(1, 10):
179-
data = u'Message number {}'.format(n)
180-
# Data must be a bytestring
181-
data = data.encode('utf-8')
182-
# When you publish a message, the client returns a Future.
183-
message_future = publisher.publish(topic_path, data=data)
184-
# If you wish to handle publish failures, do it in the callback.
185-
# Otherwise, it's okay to call `message_future.result()` directly.
186-
message_future.add_done_callback(callback)
187-
188-
print('Published message IDs:')
189-
190-
# We keep the main thread from exiting so message futures can be
191-
# resolved in the background.
192-
while True:
193-
time.sleep(60)
169+
def get_callback(f, data):
170+
def callback(f):
171+
try:
172+
print(f.result())
173+
futures.pop(data)
174+
except: # noqa
175+
print("Please handle {} for {}.".format(f.exception(), data))
176+
return callback
177+
178+
for i in range(10):
179+
data = str(i)
180+
futures.update({data: None})
181+
# When you publish a message, the client returns a future.
182+
future = publisher.publish(
183+
topic_path,
184+
data=data.encode("utf-8"), # data must be a bytestring.
185+
)
186+
futures[data] = future
187+
# Publish failures shall be handled in the callback function.
188+
future.add_done_callback(get_callback(future, data))
189+
190+
# Wait for all the publish futures to resolve before exiting.
191+
while futures:
192+
time.sleep(5)
193+
194+
print("Published message with error handler.")
194195
# [END pubsub_publish_messages_error_handler]
195196

196197

@@ -215,9 +216,10 @@ def publish_messages_with_batch_settings(project_id, topic_name):
215216
data = u'Message number {}'.format(n)
216217
# Data must be a bytestring
217218
data = data.encode('utf-8')
218-
publisher.publish(topic_path, data=data)
219+
future = publisher.publish(topic_path, data=data)
220+
print(future.result())
219221

220-
print('Published messages.')
222+
print('Published messages with batch settings.')
221223
# [END pubsub_publisher_batch_settings]
222224

223225

pubsub/cloud-client/publisher_test.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,7 @@ def test_publish_with_batch_settings(topic, capsys):
111111

112112

113113
def test_publish_with_error_handler(topic, capsys):
114-
115-
with _make_sleep_patch():
116-
with pytest.raises(RuntimeError, match='sigil'):
117-
publisher.publish_messages_with_error_handler(
118-
PROJECT, TOPIC)
114+
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)
119115

120116
out, _ = capsys.readouterr()
121117
assert 'Published' in out

pubsub/cloud-client/subscriber_test.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,15 @@ def test_update(subscriber_client, subscription, capsys):
169169
def _publish_messages(publisher_client, topic):
170170
for n in range(5):
171171
data = u'Message {}'.format(n).encode('utf-8')
172-
publisher_client.publish(
172+
future = publisher_client.publish(
173173
topic, data=data)
174+
future.result()
174175

175176

176177
def _publish_messages_with_custom_attributes(publisher_client, topic):
177178
data = u'Test message'.encode('utf-8')
178-
publisher_client.publish(topic, data=data, origin='python-sample')
179+
future = publisher_client.publish(topic, data=data, origin='python-sample')
180+
future.result()
179181

180182

181183
def _make_sleep_patch():

0 commit comments

Comments
 (0)