@@ -110,6 +110,30 @@ def callback(message):
110
110
time .sleep (60 )
111
111
112
112
113
+ def listen_for_errors (project , subscription_name ):
114
+ """Receives messages and catches errors from a pull subscription."""
115
+ subscriber = pubsub_v1 .SubscriberClient ()
116
+ subscription_path = subscriber .subscription_path (
117
+ project , subscription_name )
118
+
119
+ def callback (message ):
120
+ print ('Received message: {}' .format (message ))
121
+ message .ack ()
122
+
123
+ subscription = subscriber .subscribe (subscription_path , callback = callback )
124
+
125
+ # Blocks the thread while messages are coming in through the stream. Any
126
+ # exceptions that crop up on the thread will be set on the future.
127
+ future = subscription .open (callback )
128
+ try :
129
+ future .result ()
130
+ except Exception as e :
131
+ print (
132
+ 'Listening for messages on {} threw an Exception: {}.' .format (
133
+ subscription_name , e ))
134
+ raise
135
+
136
+
113
137
if __name__ == '__main__' :
114
138
parser = argparse .ArgumentParser (
115
139
description = __doc__ ,
@@ -143,6 +167,10 @@ def callback(message):
143
167
help = receive_messages_with_flow_control .__doc__ )
144
168
receive_with_flow_control_parser .add_argument ('subscription_name' )
145
169
170
+ listen_for_errors_parser = subparsers .add_parser (
171
+ 'listen_for_errors' , help = listen_for_errors .__doc__ )
172
+ listen_for_errors_parser .add_argument ('subscription_name' )
173
+
146
174
args = parser .parse_args ()
147
175
148
176
if args .command == 'list_in_topic' :
@@ -160,3 +188,5 @@ def callback(message):
160
188
elif args .command == 'receive-flow-control' :
161
189
receive_messages_with_flow_control (
162
190
args .project , args .subscription_name )
191
+ elif args .command == 'listen_for_errors' :
192
+ listen_for_errors (args .project , args .subscription_name )
0 commit comments