Skip to content

Commit 68a4622

Browse files
Pub/Sub: synchronous pull with lease management (#1701)
* Synchronous pull with lease management * Updated library version
1 parent 9fee707 commit 68a4622

File tree

3 files changed

+137
-28
lines changed

3 files changed

+137
-28
lines changed

pubsub/cloud-client/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
google-cloud-pubsub==0.37.2
1+
google-cloud-pubsub==0.38.0

pubsub/cloud-client/subscriber.py

Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323

2424
import argparse
2525
import time
26+
import logging
27+
import random
28+
import multiprocessing
2629

2730
from google.cloud import pubsub_v1
2831

@@ -215,7 +218,7 @@ def callback(message):
215218
# [END pubsub_subscriber_flow_settings]
216219

217220

218-
def receive_messages_synchronously(project, subscription_name):
221+
def synchronous_pull(project, subscription_name):
219222
"""Pulling messages synchronously."""
220223
# [START pubsub_subscriber_sync_pull]
221224
# project = "Your Google Cloud Project ID"
@@ -224,13 +227,10 @@ def receive_messages_synchronously(project, subscription_name):
224227
subscription_path = subscriber.subscription_path(
225228
project, subscription_name)
226229

227-
# Builds a pull request with a specific number of messages to return.
228-
# `return_immediately` is set to False so that the system waits (for a
229-
# bounded amount of time) until at lease one message is available.
230-
response = subscriber.pull(
231-
subscription_path,
232-
max_messages=3,
233-
return_immediately=False)
230+
NUM_MESSAGES=3
231+
232+
# The subscriber pulls a specific number of messages.
233+
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)
234234

235235
ack_ids = []
236236
for received_message in response.received_messages:
@@ -239,9 +239,72 @@ def receive_messages_synchronously(project, subscription_name):
239239

240240
# Acknowledges the received messages so they will not be sent again.
241241
subscriber.acknowledge(subscription_path, ack_ids)
242+
243+
print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
242244
# [END pubsub_subscriber_sync_pull]
243245

244246

247+
def synchronous_pull_with_lease_management(project, subscription_name):
248+
"""Pulling messages synchronously with lease management"""
249+
# [START pubsub_subscriber_sync_pull_with_lease]
250+
# project = "Your Google Cloud Project ID"
251+
# subscription_name = "Your Pubsub subscription name"
252+
subscriber = pubsub_v1.SubscriberClient()
253+
subscription_path = subscriber.subscription_path(
254+
project, subscription_name)
255+
256+
NUM_MESSAGES=2
257+
ACK_DEADLINE=30
258+
SLEEP_TIME=10
259+
260+
# The subscriber pulls a specific number of messages.
261+
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)
262+
263+
multiprocessing.log_to_stderr()
264+
logger = multiprocessing.get_logger()
265+
logger.setLevel(logging.INFO)
266+
267+
def worker(msg):
268+
"""Simulates a long-running process."""
269+
RUN_TIME = random.randint(1,60)
270+
logger.info('{}: Running {} for {}s'.format(
271+
time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
272+
time.sleep(RUN_TIME)
273+
274+
# `processes` stores process as key and ack id and message as values.
275+
processes = dict()
276+
for message in response.received_messages:
277+
process = multiprocessing.Process(target=worker, args=(message,))
278+
processes[process] = (message.ack_id, message.message.data)
279+
process.start()
280+
281+
while processes:
282+
for process, (ack_id, msg_data) in processes.items():
283+
# If the process is still running, reset the ack deadline as
284+
# specified by ACK_DEADLINE once every while as specified
285+
# by SLEEP_TIME.
286+
if process.is_alive():
287+
# `ack_deadline_seconds` must be between 10 to 600.
288+
subscriber.modify_ack_deadline(subscription_path,
289+
[ack_id], ack_deadline_seconds=ACK_DEADLINE)
290+
logger.info('{}: Reset ack deadline for {} for {}s'.format(
291+
time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE))
292+
293+
# If the processs is finished, acknowledges using `ack_id`.
294+
else:
295+
subscriber.acknowledge(subscription_path, [ack_id])
296+
logger.info("{}: Acknowledged {}".format(
297+
time.strftime("%X", time.gmtime()), msg_data))
298+
processes.pop(process)
299+
300+
# If there are still processes running, sleeps the thread.
301+
if processes:
302+
time.sleep(SLEEP_TIME)
303+
304+
print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
305+
# [END pubsub_subscriber_sync_pull_with_lease]
306+
307+
245308
def listen_for_errors(project, subscription_name):
246309
"""Receives messages and catches errors from a pull subscription."""
247310
# [START pubsub_subscriber_error_listener]
@@ -318,10 +381,15 @@ def callback(message):
318381
help=receive_messages_with_flow_control.__doc__)
319382
receive_with_flow_control_parser.add_argument('subscription_name')
320383

321-
receive_messages_synchronously_parser = subparsers.add_parser(
384+
synchronous_pull_parser = subparsers.add_parser(
322385
'receive-synchronously',
323-
help=receive_messages_synchronously.__doc__)
324-
receive_messages_synchronously_parser.add_argument('subscription_name')
386+
help=synchronous_pull.__doc__)
387+
synchronous_pull_parser.add_argument('subscription_name')
388+
389+
synchronous_pull_with_lease_management_parser = subparsers.add_parser(
390+
'receive-synchronously-with-lease',
391+
help=synchronous_pull_with_lease_management.__doc__)
392+
synchronous_pull_with_lease_management_parser.add_argument('subscription_name')
325393

326394
listen_for_errors_parser = subparsers.add_parser(
327395
'listen_for_errors', help=listen_for_errors.__doc__)
@@ -357,7 +425,10 @@ def callback(message):
357425
receive_messages_with_flow_control(
358426
args.project, args.subscription_name)
359427
elif args.command == 'receive-synchronously':
360-
receive_messages_synchronously(
428+
synchronous_pull(
429+
args.project, args.subscription_name)
430+
elif args.command == 'receive-synchronously-with-lease':
431+
synchronous_pull_with_lease_management(
361432
args.project, args.subscription_name)
362433
elif args.command == 'listen_for_errors':
363434
listen_for_errors(args.project, args.subscription_name)

pubsub/cloud-client/subscriber_test.py

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
PROJECT = os.environ['GCLOUD_PROJECT']
2626
TOPIC = 'subscription-test-topic'
2727
SUBSCRIPTION = 'subscription-test-subscription'
28+
SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1'
29+
SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2'
2830
ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT)
2931

3032

@@ -67,6 +69,36 @@ def subscription(subscriber_client, topic):
6769
yield subscription_path
6870

6971

72+
@pytest.fixture
73+
def subscription_sync1(subscriber_client, topic):
74+
subscription_sync_path = subscriber_client.subscription_path(
75+
PROJECT, SUBSCRIPTION_SYNC1)
76+
77+
try:
78+
subscriber_client.delete_subscription(subscription_sync_path)
79+
except Exception:
80+
pass
81+
82+
subscriber_client.create_subscription(subscription_sync_path, topic=topic)
83+
84+
yield subscription_sync_path
85+
86+
87+
@pytest.fixture
88+
def subscription_sync2(subscriber_client, topic):
89+
subscription_sync_path = subscriber_client.subscription_path(
90+
PROJECT, SUBSCRIPTION_SYNC2)
91+
92+
try:
93+
subscriber_client.delete_subscription(subscription_sync_path)
94+
except Exception:
95+
pass
96+
97+
subscriber_client.create_subscription(subscription_sync_path, topic=topic)
98+
99+
yield subscription_sync_path
100+
101+
70102
def test_list_in_topic(subscription, capsys):
71103
@eventually_consistent.call
72104
def _():
@@ -160,6 +192,27 @@ def test_receive(publisher_client, topic, subscription, capsys):
160192
assert 'Message 1' in out
161193

162194

195+
def test_receive_synchronously(
196+
publisher_client, topic, subscription_sync1, capsys):
197+
_publish_messages(publisher_client, topic)
198+
199+
subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC1)
200+
201+
out, _ = capsys.readouterr()
202+
assert 'Done.' in out
203+
204+
205+
def test_receive_synchronously_with_lease(
206+
publisher_client, topic, subscription_sync2, capsys):
207+
_publish_messages(publisher_client, topic)
208+
209+
subscriber.synchronous_pull_with_lease_management(
210+
PROJECT, SUBSCRIPTION_SYNC2)
211+
212+
out, _ = capsys.readouterr()
213+
assert 'Done.' in out
214+
215+
163216
def test_receive_with_custom_attributes(
164217
publisher_client, topic, subscription, capsys):
165218
_publish_messages_with_custom_attributes(publisher_client, topic)
@@ -188,18 +241,3 @@ def test_receive_with_flow_control(
188241
assert 'Listening' in out
189242
assert subscription in out
190243
assert 'Message 1' in out
191-
192-
193-
def test_receive_synchronously(
194-
publisher_client, topic, subscription, capsys):
195-
_publish_messages(publisher_client, topic)
196-
197-
with _make_sleep_patch():
198-
with pytest.raises(RuntimeError, match='sigil'):
199-
subscriber.receive_messages_with_flow_control(
200-
PROJECT, SUBSCRIPTION)
201-
202-
out, _ = capsys.readouterr()
203-
assert 'Message 1' in out
204-
assert 'Message 2' in out
205-
assert 'Message 3' in out

0 commit comments

Comments
 (0)