@@ -90,6 +90,8 @@ def create_push_subscription(project,
90
90
def delete_subscription (project , subscription_name ):
91
91
"""Deletes an existing Pub/Sub topic."""
92
92
# [START pubsub_delete_subscription]
93
+ # project = "Your Google Cloud Project ID"
94
+ # subscription_name = "Your Pubsub subscription name"
93
95
subscriber = pubsub_v1 .SubscriberClient ()
94
96
subscription_path = subscriber .subscription_path (
95
97
project , subscription_name )
@@ -138,6 +140,8 @@ def receive_messages(project, subscription_name):
138
140
"""Receives messages from a pull subscription."""
139
141
# [START pubsub_subscriber_async_pull]
140
142
# [START pubsub_quickstart_subscriber]
143
+ # project = "Your Google Cloud Project ID"
144
+ # subscription_name = "Your Pubsub subscription name"
141
145
subscriber = pubsub_v1 .SubscriberClient ()
142
146
subscription_path = subscriber .subscription_path (
143
147
project , subscription_name )
@@ -160,6 +164,8 @@ def callback(message):
160
164
def receive_messages_with_custom_attributes (project , subscription_name ):
161
165
"""Receives messages from a pull subscription."""
162
166
# [START pubsub_subscriber_sync_pull_custom_attributes]
167
+ # project = "Your Google Cloud Project ID"
168
+ # subscription_name = "Your Pubsub subscription name"
163
169
subscriber = pubsub_v1 .SubscriberClient ()
164
170
subscription_path = subscriber .subscription_path (
165
171
project , subscription_name )
@@ -186,6 +192,8 @@ def callback(message):
186
192
def receive_messages_with_flow_control (project , subscription_name ):
187
193
"""Receives messages from a pull subscription with flow control."""
188
194
# [START pubsub_subscriber_flow_settings]
195
+ # project = "Your Google Cloud Project ID"
196
+ # subscription_name = "Your Pubsub subscription name"
189
197
subscriber = pubsub_v1 .SubscriberClient ()
190
198
subscription_path = subscriber .subscription_path (
191
199
project , subscription_name )
@@ -207,9 +215,38 @@ def callback(message):
207
215
# [END pubsub_subscriber_flow_settings]
208
216
209
217
218
+ def receive_messages_synchronously (project , subscription_name ):
219
+ """Pulling messages synchronously."""
220
+ # [START pubsub_subscriber_sync_pull]
221
+ # project = "Your Google Cloud Project ID"
222
+ # subscription_name = "Your Pubsub subscription name"
223
+ subscriber = pubsub_v1 .SubscriberClient ()
224
+ subscription_path = subscriber .subscription_path (
225
+ project , subscription_name )
226
+
227
+ # Builds a pull request with a specific number of messages to return.
228
+ # `return_immediately` is set to False so that the system waits (for a
229
+ # bounded amount of time) until at lease one message is available.
230
+ response = subscriber .pull (
231
+ subscription_path ,
232
+ max_messages = 3 ,
233
+ return_immediately = False )
234
+
235
+ ack_ids = []
236
+ for received_message in response .received_messages :
237
+ print ("Received: {}" .format (received_message .message .data ))
238
+ ack_ids .append (received_message .ack_id )
239
+
240
+ # Acknowledges the received messages so they will not be sent again.
241
+ subscriber .acknowledge (subscription_path , ack_ids )
242
+ # [END pubsub_subscriber_sync_pull]
243
+
244
+
210
245
def listen_for_errors (project , subscription_name ):
211
246
"""Receives messages and catches errors from a pull subscription."""
212
247
# [START pubsub_subscriber_error_listener]
248
+ # project = "Your Google Cloud Project ID"
249
+ # subscription_name = "Your Pubsub subscription name"
213
250
subscriber = pubsub_v1 .SubscriberClient ()
214
251
subscription_path = subscriber .subscription_path (
215
252
project , subscription_name )
@@ -281,6 +318,11 @@ def callback(message):
281
318
help = receive_messages_with_flow_control .__doc__ )
282
319
receive_with_flow_control_parser .add_argument ('subscription_name' )
283
320
321
+ receive_messages_synchronously_parser = subparsers .add_parser (
322
+ 'receive-synchronously' ,
323
+ help = receive_messages_synchronously .__doc__ )
324
+ receive_messages_synchronously_parser .add_argument ('subscription_name' )
325
+
284
326
listen_for_errors_parser = subparsers .add_parser (
285
327
'listen_for_errors' , help = listen_for_errors .__doc__ )
286
328
listen_for_errors_parser .add_argument ('subscription_name' )
@@ -314,5 +356,8 @@ def callback(message):
314
356
elif args .command == 'receive-flow-control' :
315
357
receive_messages_with_flow_control (
316
358
args .project , args .subscription_name )
359
+ elif args .command == 'receive-synchronously' :
360
+ receive_messages_synchronously (
361
+ args .project , args .subscription_name )
317
362
elif args .command == 'listen_for_errors' :
318
363
listen_for_errors (args .project , args .subscription_name )
0 commit comments