Skip to content

Commit 84c2cb2

Browse files
Added sample for Pub/Sub synchronous pull subscriber (#1673)
* Added sample for synchronous pull
1 parent eedc6d2 commit 84c2cb2

File tree

2 files changed

+60
-0
lines changed

2 files changed

+60
-0
lines changed

pubsub/cloud-client/subscriber.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ def create_push_subscription(project,
9090
def delete_subscription(project, subscription_name):
9191
"""Deletes an existing Pub/Sub topic."""
9292
# [START pubsub_delete_subscription]
93+
# project = "Your Google Cloud Project ID"
94+
# subscription_name = "Your Pubsub subscription name"
9395
subscriber = pubsub_v1.SubscriberClient()
9496
subscription_path = subscriber.subscription_path(
9597
project, subscription_name)
@@ -138,6 +140,8 @@ def receive_messages(project, subscription_name):
138140
"""Receives messages from a pull subscription."""
139141
# [START pubsub_subscriber_async_pull]
140142
# [START pubsub_quickstart_subscriber]
143+
# project = "Your Google Cloud Project ID"
144+
# subscription_name = "Your Pubsub subscription name"
141145
subscriber = pubsub_v1.SubscriberClient()
142146
subscription_path = subscriber.subscription_path(
143147
project, subscription_name)
@@ -160,6 +164,8 @@ def callback(message):
160164
def receive_messages_with_custom_attributes(project, subscription_name):
161165
"""Receives messages from a pull subscription."""
162166
# [START pubsub_subscriber_sync_pull_custom_attributes]
167+
# project = "Your Google Cloud Project ID"
168+
# subscription_name = "Your Pubsub subscription name"
163169
subscriber = pubsub_v1.SubscriberClient()
164170
subscription_path = subscriber.subscription_path(
165171
project, subscription_name)
@@ -186,6 +192,8 @@ def callback(message):
186192
def receive_messages_with_flow_control(project, subscription_name):
187193
"""Receives messages from a pull subscription with flow control."""
188194
# [START pubsub_subscriber_flow_settings]
195+
# project = "Your Google Cloud Project ID"
196+
# subscription_name = "Your Pubsub subscription name"
189197
subscriber = pubsub_v1.SubscriberClient()
190198
subscription_path = subscriber.subscription_path(
191199
project, subscription_name)
@@ -207,9 +215,38 @@ def callback(message):
207215
# [END pubsub_subscriber_flow_settings]
208216

209217

218+
def receive_messages_synchronously(project, subscription_name):
219+
"""Pulling messages synchronously."""
220+
# [START pubsub_subscriber_sync_pull]
221+
# project = "Your Google Cloud Project ID"
222+
# subscription_name = "Your Pubsub subscription name"
223+
subscriber = pubsub_v1.SubscriberClient()
224+
subscription_path = subscriber.subscription_path(
225+
project, subscription_name)
226+
227+
# Builds a pull request with a specific number of messages to return.
228+
# `return_immediately` is set to False so that the system waits (for a
229+
# bounded amount of time) until at lease one message is available.
230+
response = subscriber.pull(
231+
subscription_path,
232+
max_messages=3,
233+
return_immediately=False)
234+
235+
ack_ids = []
236+
for received_message in response.received_messages:
237+
print("Received: {}".format(received_message.message.data))
238+
ack_ids.append(received_message.ack_id)
239+
240+
# Acknowledges the received messages so they will not be sent again.
241+
subscriber.acknowledge(subscription_path, ack_ids)
242+
# [END pubsub_subscriber_sync_pull]
243+
244+
210245
def listen_for_errors(project, subscription_name):
211246
"""Receives messages and catches errors from a pull subscription."""
212247
# [START pubsub_subscriber_error_listener]
248+
# project = "Your Google Cloud Project ID"
249+
# subscription_name = "Your Pubsub subscription name"
213250
subscriber = pubsub_v1.SubscriberClient()
214251
subscription_path = subscriber.subscription_path(
215252
project, subscription_name)
@@ -281,6 +318,11 @@ def callback(message):
281318
help=receive_messages_with_flow_control.__doc__)
282319
receive_with_flow_control_parser.add_argument('subscription_name')
283320

321+
receive_messages_synchronously_parser = subparsers.add_parser(
322+
'receive-synchronously',
323+
help=receive_messages_synchronously.__doc__)
324+
receive_messages_synchronously_parser.add_argument('subscription_name')
325+
284326
listen_for_errors_parser = subparsers.add_parser(
285327
'listen_for_errors', help=listen_for_errors.__doc__)
286328
listen_for_errors_parser.add_argument('subscription_name')
@@ -314,5 +356,8 @@ def callback(message):
314356
elif args.command == 'receive-flow-control':
315357
receive_messages_with_flow_control(
316358
args.project, args.subscription_name)
359+
elif args.command == 'receive-synchronously':
360+
receive_messages_synchronously(
361+
args.project, args.subscription_name)
317362
elif args.command == 'listen_for_errors':
318363
listen_for_errors(args.project, args.subscription_name)

pubsub/cloud-client/subscriber_test.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,18 @@ def test_receive_with_flow_control(
188188
assert 'Listening' in out
189189
assert subscription in out
190190
assert 'Message 1' in out
191+
192+
193+
def test_receive_synchronously(
194+
publisher_client, topic, subscription, capsys):
195+
_publish_messages(publisher_client, topic)
196+
197+
with _make_sleep_patch():
198+
with pytest.raises(RuntimeError, match='sigil'):
199+
subscriber.receive_messages_with_flow_control(
200+
PROJECT, SUBSCRIPTION)
201+
202+
out, _ = capsys.readouterr()
203+
assert 'Message 1' in out
204+
assert 'Message 2' in out
205+
assert 'Message 3' in out

0 commit comments

Comments
 (0)