From a97a4a31cdc8c389a6375e05f556b3368522ef5e Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 11 Sep 2018 18:00:55 -0700 Subject: [PATCH 1/5] Synchronous pull with lease management --- pubsub/cloud-client/subscriber.py | 78 ++++++++++++++++++++++++++ pubsub/cloud-client/subscriber_test.py | 26 ++++----- 2 files changed, 89 insertions(+), 15 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index e3156072207..87cf38c13e3 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -23,6 +23,10 @@ import argparse import time +import logging +import random +import multiprocessing +from collections import defaultdict from google.cloud import pubsub_v1 @@ -239,9 +243,75 @@ def receive_messages_synchronously(project, subscription_name): # Acknowledges the received messages so they will not be sent again. subscriber.acknowledge(subscription_path, ack_ids) + + print("Received and acknowledged all messages. Done.") # [END pubsub_subscriber_sync_pull] +def synchronous_pull_with_lease_management(project, subscription_name): + """Pulling messages synchronously with lease management""" + # [START pubsub_subscriber_sync_pull_with_lease] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, subscription_name) + + NUM_MESSAGES = 2 + # Builds a pull request with a specific number of messages to return. + # `return_immediately` is set to False so that the system waits (for a + # bounded amount of time) until at lease one message is available. + response = subscriber.pull( + subscription_path, + max_messages=NUM_MESSAGES, + return_immediately=False) + + multiprocessing.log_to_stderr() + logger = multiprocessing.get_logger() + logger.setLevel(logging.INFO) + + def worker(msg): + """Simulates a long-running process.""" + RUN_TIME = random.randint(1,60) + logger.info('{}: Running {} for {}s'.format( + time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME)) + time.sleep(RUN_TIME) + + # `d` stores process as key and ack id and message as values. + d = defaultdict(lambda: (str, str)) + for received_message in response.received_messages: + process = multiprocessing.Process(target=worker, + args=(received_message,)) + d[process] = (received_message.ack_id, received_message.message.data) + process.start() + + ACK_DEADLINE=60 + + while d: + for process, (ack_id, msg_data) in d.items(): + # If the process is still running, reset the ack deadline. + if process.is_alive(): + # `ack_deadline_seconds` must be between 10s to 600s. + subscriber.modify_ack_deadline(subscription_path, + [ack_id], ack_deadline_seconds=ACK_DEADLINE) + logger.info('{}: Reset ack deadline for {} for {}s'.format( + time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE)) + + # Otherwise, acknowledges using `ack_id`. + else: + subscriber.acknowledge(subscription_path, [ack_id]) + logger.info("{}: Acknowledged {}".format( + time.strftime("%X", time.gmtime()), msg_data)) + d.pop(process) + + # Sleeps the thread for 10s to save resources. + if d: + time.sleep(10) + + print("Received and acknowledged all messages. Done.") + # [END pubsub_subscriber_sync_pull_with_lease] + + def listen_for_errors(project, subscription_name): """Receives messages and catches errors from a pull subscription.""" # [START pubsub_subscriber_error_listener] @@ -323,6 +393,11 @@ def callback(message): help=receive_messages_synchronously.__doc__) receive_messages_synchronously_parser.add_argument('subscription_name') + receive_messages_synchronously_with_lease_parser = subparsers.add_parser( + 'receive-synchronously-with-lease', + help=synchronous_pull_with_lease_management.__doc__) + receive_messages_synchronously_with_lease_parser.add_argument('subscription_name') + listen_for_errors_parser = subparsers.add_parser( 'listen_for_errors', help=listen_for_errors.__doc__) listen_for_errors_parser.add_argument('subscription_name') @@ -359,5 +434,8 @@ def callback(message): elif args.command == 'receive-synchronously': receive_messages_synchronously( args.project, args.subscription_name) + elif args.command == 'receive-synchronously-with-lease': + synchronous_pull_with_lease_management( + args.project, args.subscription_name) elif args.command == 'listen_for_errors': listen_for_errors(args.project, args.subscription_name) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 728f971f2e7..693fe739d88 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -160,6 +160,17 @@ def test_receive(publisher_client, topic, subscription, capsys): assert 'Message 1' in out +def test_receive_synchronously( + publisher_client, topic, subscription, capsys): + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_synchronously(PROJECT, SUBSCRIPTION) + subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert 'Received and acknowledged all messages. Done.' in out + + def test_receive_with_custom_attributes( publisher_client, topic, subscription, capsys): _publish_messages_with_custom_attributes(publisher_client, topic) @@ -188,18 +199,3 @@ def test_receive_with_flow_control( assert 'Listening' in out assert subscription in out assert 'Message 1' in out - - -def test_receive_synchronously( - publisher_client, topic, subscription, capsys): - _publish_messages(publisher_client, topic) - - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): - subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION) - - out, _ = capsys.readouterr() - assert 'Message 1' in out - assert 'Message 2' in out - assert 'Message 3' in out From 4402e8b381101541bdb3a6d2f69ba71b26170265 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 12 Sep 2018 15:54:13 -0700 Subject: [PATCH 2/5] Renamed variables, rewrote comments --- pubsub/cloud-client/subscriber.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 87cf38c13e3..0ab1d8f274f 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -26,7 +26,6 @@ import logging import random import multiprocessing -from collections import defaultdict from google.cloud import pubsub_v1 @@ -277,19 +276,21 @@ def worker(msg): time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME)) time.sleep(RUN_TIME) - # `d` stores process as key and ack id and message as values. - d = defaultdict(lambda: (str, str)) - for received_message in response.received_messages: - process = multiprocessing.Process(target=worker, - args=(received_message,)) - d[process] = (received_message.ack_id, received_message.message.data) + # `processes` stores process as key and ack id and message as values. + processes = dict() + for message in response.received_messages: + process = multiprocessing.Process(target=worker, args=(message,)) + processes[process] = (message.ack_id, message.message.data) process.start() - ACK_DEADLINE=60 + ACK_DEADLINE=30 + SLEEP_TIME=10 - while d: - for process, (ack_id, msg_data) in d.items(): - # If the process is still running, reset the ack deadline. + while processes: + for process, (ack_id, msg_data) in processes.items(): + # If the process is still running, reset the ack deadline as + # specified by ACK_DEADLINE, and once every while as specified + # by SLEEP_TIME. if process.is_alive(): # `ack_deadline_seconds` must be between 10s to 600s. subscriber.modify_ack_deadline(subscription_path, @@ -302,11 +303,11 @@ def worker(msg): subscriber.acknowledge(subscription_path, [ack_id]) logger.info("{}: Acknowledged {}".format( time.strftime("%X", time.gmtime()), msg_data)) - d.pop(process) + processes.pop(process) - # Sleeps the thread for 10s to save resources. - if d: - time.sleep(10) + # Sleeps the thread to save resources. + if processes: + time.sleep(SLEEP_TIME) print("Received and acknowledged all messages. Done.") # [END pubsub_subscriber_sync_pull_with_lease] From b9dbaf105c5873240a9125ee30616e7d04e9a932 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Fri, 14 Sep 2018 11:48:27 -0700 Subject: [PATCH 3/5] Rewrote test with separate subscriptions, specified timeout and rewrote comments --- pubsub/cloud-client/subscriber.py | 29 ++++++++------- pubsub/cloud-client/subscriber_test.py | 50 +++++++++++++++++++++++--- 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 0ab1d8f274f..ddb9679517b 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -227,13 +227,15 @@ def receive_messages_synchronously(project, subscription_name): subscription_path = subscriber.subscription_path( project, subscription_name) + NUM_MESSAGES=3 # Builds a pull request with a specific number of messages to return. - # `return_immediately` is set to False so that the system waits (for a - # bounded amount of time) until at lease one message is available. + # `return_immediately` is set to False so that the system waits until + # at lease one message is available before it times out. response = subscriber.pull( subscription_path, - max_messages=3, - return_immediately=False) + max_messages=NUM_MESSAGES, + return_immediately=False, + timeout=900) ack_ids = [] for received_message in response.received_messages: @@ -243,7 +245,7 @@ def receive_messages_synchronously(project, subscription_name): # Acknowledges the received messages so they will not be sent again. subscriber.acknowledge(subscription_path, ack_ids) - print("Received and acknowledged all messages. Done.") + print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES)) # [END pubsub_subscriber_sync_pull] @@ -256,14 +258,18 @@ def synchronous_pull_with_lease_management(project, subscription_name): subscription_path = subscriber.subscription_path( project, subscription_name) - NUM_MESSAGES = 2 + NUM_MESSAGES=2 + ACK_DEADLINE=30 + SLEEP_TIME=10 + # Builds a pull request with a specific number of messages to return. - # `return_immediately` is set to False so that the system waits (for a - # bounded amount of time) until at lease one message is available. + # `return_immediately` is set to False so that the system waits until + # at lease one message is available before it times out. response = subscriber.pull( subscription_path, max_messages=NUM_MESSAGES, - return_immediately=False) + return_immediately=False, + timeout=900) multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() @@ -283,9 +289,6 @@ def worker(msg): processes[process] = (message.ack_id, message.message.data) process.start() - ACK_DEADLINE=30 - SLEEP_TIME=10 - while processes: for process, (ack_id, msg_data) in processes.items(): # If the process is still running, reset the ack deadline as @@ -309,7 +312,7 @@ def worker(msg): if processes: time.sleep(SLEEP_TIME) - print("Received and acknowledged all messages. Done.") + print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES)) # [END pubsub_subscriber_sync_pull_with_lease] diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 693fe739d88..db103cbed71 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -25,6 +25,8 @@ PROJECT = os.environ['GCLOUD_PROJECT'] TOPIC = 'subscription-test-topic' SUBSCRIPTION = 'subscription-test-subscription' +SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1' +SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2' ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT) @@ -67,6 +69,36 @@ def subscription(subscriber_client, topic): yield subscription_path +@pytest.fixture +def subscription_sync1(subscriber_client, topic): + subscription_sync_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION_SYNC1) + + try: + subscriber_client.delete_subscription(subscription_sync_path) + except Exception: + pass + + subscriber_client.create_subscription(subscription_sync_path, topic=topic) + + yield subscription_sync_path + + +@pytest.fixture +def subscription_sync2(subscriber_client, topic): + subscription_sync_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION_SYNC2) + + try: + subscriber_client.delete_subscription(subscription_sync_path) + except Exception: + pass + + subscriber_client.create_subscription(subscription_sync_path, topic=topic) + + yield subscription_sync_path + + def test_list_in_topic(subscription, capsys): @eventually_consistent.call def _(): @@ -161,14 +193,24 @@ def test_receive(publisher_client, topic, subscription, capsys): def test_receive_synchronously( - publisher_client, topic, subscription, capsys): + publisher_client, topic, subscription_sync1, capsys): + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_synchronously(PROJECT, SUBSCRIPTION_SYNC1) + + out, _ = capsys.readouterr() + assert 'Done.' in out + + +def test_receive_synchronously_with_lease( + publisher_client, topic, subscription_sync2, capsys): _publish_messages(publisher_client, topic) - subscriber.receive_messages_synchronously(PROJECT, SUBSCRIPTION) - subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION) + subscriber.synchronous_pull_with_lease_management( + PROJECT, SUBSCRIPTION_SYNC2) out, _ = capsys.readouterr() - assert 'Received and acknowledged all messages. Done.' in out + assert 'Done.' in out def test_receive_with_custom_attributes( From dba44a77956fbddca3262cce06475e81fa858d26 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Fri, 14 Sep 2018 11:57:14 -0700 Subject: [PATCH 4/5] Updated library version --- pubsub/cloud-client/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/cloud-client/requirements.txt b/pubsub/cloud-client/requirements.txt index 936a9f0ed2f..81a62427cc0 100644 --- a/pubsub/cloud-client/requirements.txt +++ b/pubsub/cloud-client/requirements.txt @@ -1 +1 @@ -google-cloud-pubsub==0.37.2 +google-cloud-pubsub==0.38.0 From 05dd406bfa73dff2aaa3dd08a255a36e4ce21698 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Fri, 14 Sep 2018 16:00:13 -0700 Subject: [PATCH 5/5] Removed some specifications on the pull method, renamed a func, rewrote comments --- pubsub/cloud-client/subscriber.py | 45 ++++++++++---------------- pubsub/cloud-client/subscriber_test.py | 2 +- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index ddb9679517b..3915a82ef9c 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -218,7 +218,7 @@ def callback(message): # [END pubsub_subscriber_flow_settings] -def receive_messages_synchronously(project, subscription_name): +def synchronous_pull(project, subscription_name): """Pulling messages synchronously.""" # [START pubsub_subscriber_sync_pull] # project = "Your Google Cloud Project ID" @@ -228,14 +228,9 @@ def receive_messages_synchronously(project, subscription_name): project, subscription_name) NUM_MESSAGES=3 - # Builds a pull request with a specific number of messages to return. - # `return_immediately` is set to False so that the system waits until - # at lease one message is available before it times out. - response = subscriber.pull( - subscription_path, - max_messages=NUM_MESSAGES, - return_immediately=False, - timeout=900) + + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) ack_ids = [] for received_message in response.received_messages: @@ -261,15 +256,9 @@ def synchronous_pull_with_lease_management(project, subscription_name): NUM_MESSAGES=2 ACK_DEADLINE=30 SLEEP_TIME=10 - - # Builds a pull request with a specific number of messages to return. - # `return_immediately` is set to False so that the system waits until - # at lease one message is available before it times out. - response = subscriber.pull( - subscription_path, - max_messages=NUM_MESSAGES, - return_immediately=False, - timeout=900) + + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() @@ -292,23 +281,23 @@ def worker(msg): while processes: for process, (ack_id, msg_data) in processes.items(): # If the process is still running, reset the ack deadline as - # specified by ACK_DEADLINE, and once every while as specified + # specified by ACK_DEADLINE once every while as specified # by SLEEP_TIME. if process.is_alive(): - # `ack_deadline_seconds` must be between 10s to 600s. + # `ack_deadline_seconds` must be between 10 to 600. subscriber.modify_ack_deadline(subscription_path, [ack_id], ack_deadline_seconds=ACK_DEADLINE) logger.info('{}: Reset ack deadline for {} for {}s'.format( time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE)) - # Otherwise, acknowledges using `ack_id`. + # If the processs is finished, acknowledges using `ack_id`. else: subscriber.acknowledge(subscription_path, [ack_id]) logger.info("{}: Acknowledged {}".format( time.strftime("%X", time.gmtime()), msg_data)) processes.pop(process) - # Sleeps the thread to save resources. + # If there are still processes running, sleeps the thread. if processes: time.sleep(SLEEP_TIME) @@ -392,15 +381,15 @@ def callback(message): help=receive_messages_with_flow_control.__doc__) receive_with_flow_control_parser.add_argument('subscription_name') - receive_messages_synchronously_parser = subparsers.add_parser( + synchronous_pull_parser = subparsers.add_parser( 'receive-synchronously', - help=receive_messages_synchronously.__doc__) - receive_messages_synchronously_parser.add_argument('subscription_name') + help=synchronous_pull.__doc__) + synchronous_pull_parser.add_argument('subscription_name') - receive_messages_synchronously_with_lease_parser = subparsers.add_parser( + synchronous_pull_with_lease_management_parser = subparsers.add_parser( 'receive-synchronously-with-lease', help=synchronous_pull_with_lease_management.__doc__) - receive_messages_synchronously_with_lease_parser.add_argument('subscription_name') + synchronous_pull_with_lease_management_parser.add_argument('subscription_name') listen_for_errors_parser = subparsers.add_parser( 'listen_for_errors', help=listen_for_errors.__doc__) @@ -436,7 +425,7 @@ def callback(message): receive_messages_with_flow_control( args.project, args.subscription_name) elif args.command == 'receive-synchronously': - receive_messages_synchronously( + synchronous_pull( args.project, args.subscription_name) elif args.command == 'receive-synchronously-with-lease': synchronous_pull_with_lease_management( diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index db103cbed71..9f554398ef4 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -196,7 +196,7 @@ def test_receive_synchronously( publisher_client, topic, subscription_sync1, capsys): _publish_messages(publisher_client, topic) - subscriber.receive_messages_synchronously(PROJECT, SUBSCRIPTION_SYNC1) + subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC1) out, _ = capsys.readouterr() assert 'Done.' in out