From eb0a1ec7f5e5cf109aeb0e8d9de409a96161f24c Mon Sep 17 00:00:00 2001 From: noerog <32459203+noerog@users.noreply.github.com> Date: Mon, 8 Jan 2018 13:51:56 -0500 Subject: [PATCH 1/3] Add listen for errors sample. --- pubsub/cloud-client/subscriber.py | 34 +++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 9dded25b111..0b671800827 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -110,6 +110,34 @@ def callback(message): time.sleep(60) +def listen_for_errors(project, subscription_name): + """Receives messages and catches errors from a pull subscription.""" + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, subscription_name) + + def callback(message): + try: + print('Received message: {}'.format(message)) + message.ack() + future.result() + except Exception as e: + print( + 'Listening for messages on {} threw an Exception: {}.'.format( + subscription_name, e)) + subscription.close() + raise + + subscription = subscriber.subscribe(subscription_path, callback=callback) + future = subscription.open(callback) + + # The subscriber is non-blocking, so we must keep the main thread from + # exiting to allow it to process messages in the background. + print('Listening for messages and errors on {}'.format(subscription_path)) + while True: + time.sleep(60) + + if __name__ == '__main__': parser = argparse.ArgumentParser( description=__doc__, @@ -143,6 +171,10 @@ def callback(message): help=receive_messages_with_flow_control.__doc__) receive_with_flow_control_parser.add_argument('subscription_name') + listen_for_errors_parser = subparsers.add_parser( + 'listen_for_errors', help=listen_for_errors.__doc__) + listen_for_errors_parser.add_argument('subscription_name') + args = parser.parse_args() if args.command == 'list_in_topic': @@ -160,3 +192,5 @@ def callback(message): elif args.command == 'receive-flow-control': receive_messages_with_flow_control( args.project, args.subscription_name) + elif args.command == 'listen_for_errors': + listen_for_errors(args.project, args.subscription_name) From b1d5e88082d5bf17f80fa109ad40505e84c7626c Mon Sep 17 00:00:00 2001 From: noerog <32459203+noerog@users.noreply.github.com> Date: Mon, 8 Jan 2018 14:11:06 -0500 Subject: [PATCH 2/3] Update subscriber.py --- pubsub/cloud-client/subscriber.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 0b671800827..d57a4a436a2 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -117,25 +117,22 @@ def listen_for_errors(project, subscription_name): project, subscription_name) def callback(message): - try: - print('Received message: {}'.format(message)) - message.ack() - future.result() - except Exception as e: - print( - 'Listening for messages on {} threw an Exception: {}.'.format( - subscription_name, e)) - subscription.close() - raise + print('Received message: {}'.format(message)) + message.ack() subscription = subscriber.subscribe(subscription_path, callback=callback) - future = subscription.open(callback) - # The subscriber is non-blocking, so we must keep the main thread from - # exiting to allow it to process messages in the background. - print('Listening for messages and errors on {}'.format(subscription_path)) - while True: - time.sleep(60) + # Blocks the thread while messages are coming in through the stream. Any + # exceptions that crop up on the thread will be set on the future. + future = subscription.open(callback) + try: + future.result() + except Exception as e: + print( + 'Listening for messages on {} threw an Exception: {}.'.format( + subscription_name, e)) + subscription.close() + raise if __name__ == '__main__': From 9877e486988cefe9f25d6eec5f9adce59da48f6b Mon Sep 17 00:00:00 2001 From: noerog <32459203+noerog@users.noreply.github.com> Date: Mon, 8 Jan 2018 16:11:16 -0500 Subject: [PATCH 3/3] Update subscriber.py --- pubsub/cloud-client/subscriber.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index d57a4a436a2..577e77cb9e6 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -131,8 +131,7 @@ def callback(message): print( 'Listening for messages on {} threw an Exception: {}.'.format( subscription_name, e)) - subscription.close() - raise + raise if __name__ == '__main__':