23
23
24
24
import argparse
25
25
import time
26
+ import logging
27
+ import random
28
+ import multiprocessing
26
29
27
30
from google .cloud import pubsub_v1
28
31
@@ -215,7 +218,7 @@ def callback(message):
215
218
# [END pubsub_subscriber_flow_settings]
216
219
217
220
218
- def receive_messages_synchronously (project , subscription_name ):
221
+ def synchronous_pull (project , subscription_name ):
219
222
"""Pulling messages synchronously."""
220
223
# [START pubsub_subscriber_sync_pull]
221
224
# project = "Your Google Cloud Project ID"
@@ -224,13 +227,10 @@ def receive_messages_synchronously(project, subscription_name):
224
227
subscription_path = subscriber .subscription_path (
225
228
project , subscription_name )
226
229
227
- # Builds a pull request with a specific number of messages to return.
228
- # `return_immediately` is set to False so that the system waits (for a
229
- # bounded amount of time) until at lease one message is available.
230
- response = subscriber .pull (
231
- subscription_path ,
232
- max_messages = 3 ,
233
- return_immediately = False )
230
+ NUM_MESSAGES = 3
231
+
232
+ # The subscriber pulls a specific number of messages.
233
+ response = subscriber .pull (subscription_path , max_messages = NUM_MESSAGES )
234
234
235
235
ack_ids = []
236
236
for received_message in response .received_messages :
@@ -239,9 +239,72 @@ def receive_messages_synchronously(project, subscription_name):
239
239
240
240
# Acknowledges the received messages so they will not be sent again.
241
241
subscriber .acknowledge (subscription_path , ack_ids )
242
+
243
+ print ("Received and acknowledged {} messages. Done." .format (NUM_MESSAGES ))
242
244
# [END pubsub_subscriber_sync_pull]
243
245
244
246
247
+ def synchronous_pull_with_lease_management (project , subscription_name ):
248
+ """Pulling messages synchronously with lease management"""
249
+ # [START pubsub_subscriber_sync_pull_with_lease]
250
+ # project = "Your Google Cloud Project ID"
251
+ # subscription_name = "Your Pubsub subscription name"
252
+ subscriber = pubsub_v1 .SubscriberClient ()
253
+ subscription_path = subscriber .subscription_path (
254
+ project , subscription_name )
255
+
256
+ NUM_MESSAGES = 2
257
+ ACK_DEADLINE = 30
258
+ SLEEP_TIME = 10
259
+
260
+ # The subscriber pulls a specific number of messages.
261
+ response = subscriber .pull (subscription_path , max_messages = NUM_MESSAGES )
262
+
263
+ multiprocessing .log_to_stderr ()
264
+ logger = multiprocessing .get_logger ()
265
+ logger .setLevel (logging .INFO )
266
+
267
+ def worker (msg ):
268
+ """Simulates a long-running process."""
269
+ RUN_TIME = random .randint (1 ,60 )
270
+ logger .info ('{}: Running {} for {}s' .format (
271
+ time .strftime ("%X" , time .gmtime ()), msg .message .data , RUN_TIME ))
272
+ time .sleep (RUN_TIME )
273
+
274
+ # `processes` stores process as key and ack id and message as values.
275
+ processes = dict ()
276
+ for message in response .received_messages :
277
+ process = multiprocessing .Process (target = worker , args = (message ,))
278
+ processes [process ] = (message .ack_id , message .message .data )
279
+ process .start ()
280
+
281
+ while processes :
282
+ for process , (ack_id , msg_data ) in processes .items ():
283
+ # If the process is still running, reset the ack deadline as
284
+ # specified by ACK_DEADLINE once every while as specified
285
+ # by SLEEP_TIME.
286
+ if process .is_alive ():
287
+ # `ack_deadline_seconds` must be between 10 to 600.
288
+ subscriber .modify_ack_deadline (subscription_path ,
289
+ [ack_id ], ack_deadline_seconds = ACK_DEADLINE )
290
+ logger .info ('{}: Reset ack deadline for {} for {}s' .format (
291
+ time .strftime ("%X" , time .gmtime ()), msg_data , ACK_DEADLINE ))
292
+
293
+ # If the processs is finished, acknowledges using `ack_id`.
294
+ else :
295
+ subscriber .acknowledge (subscription_path , [ack_id ])
296
+ logger .info ("{}: Acknowledged {}" .format (
297
+ time .strftime ("%X" , time .gmtime ()), msg_data ))
298
+ processes .pop (process )
299
+
300
+ # If there are still processes running, sleeps the thread.
301
+ if processes :
302
+ time .sleep (SLEEP_TIME )
303
+
304
+ print ("Received and acknowledged {} messages. Done." .format (NUM_MESSAGES ))
305
+ # [END pubsub_subscriber_sync_pull_with_lease]
306
+
307
+
245
308
def listen_for_errors (project , subscription_name ):
246
309
"""Receives messages and catches errors from a pull subscription."""
247
310
# [START pubsub_subscriber_error_listener]
@@ -318,10 +381,15 @@ def callback(message):
318
381
help = receive_messages_with_flow_control .__doc__ )
319
382
receive_with_flow_control_parser .add_argument ('subscription_name' )
320
383
321
- receive_messages_synchronously_parser = subparsers .add_parser (
384
+ synchronous_pull_parser = subparsers .add_parser (
322
385
'receive-synchronously' ,
323
- help = receive_messages_synchronously .__doc__ )
324
- receive_messages_synchronously_parser .add_argument ('subscription_name' )
386
+ help = synchronous_pull .__doc__ )
387
+ synchronous_pull_parser .add_argument ('subscription_name' )
388
+
389
+ synchronous_pull_with_lease_management_parser = subparsers .add_parser (
390
+ 'receive-synchronously-with-lease' ,
391
+ help = synchronous_pull_with_lease_management .__doc__ )
392
+ synchronous_pull_with_lease_management_parser .add_argument ('subscription_name' )
325
393
326
394
listen_for_errors_parser = subparsers .add_parser (
327
395
'listen_for_errors' , help = listen_for_errors .__doc__ )
@@ -357,7 +425,10 @@ def callback(message):
357
425
receive_messages_with_flow_control (
358
426
args .project , args .subscription_name )
359
427
elif args .command == 'receive-synchronously' :
360
- receive_messages_synchronously (
428
+ synchronous_pull (
429
+ args .project , args .subscription_name )
430
+ elif args .command == 'receive-synchronously-with-lease' :
431
+ synchronous_pull_with_lease_management (
361
432
args .project , args .subscription_name )
362
433
elif args .command == 'listen_for_errors' :
363
434
listen_for_errors (args .project , args .subscription_name )
0 commit comments