@@ -120,7 +120,8 @@ def publish_messages_with_custom_attributes(project_id, topic_name):
120
120
data = data .encode ('utf-8' )
121
121
# Add two attributes, origin and username, to the message
122
122
future = publisher .publish (
123
- topic_path , data , origin = 'python-sample' , username = 'gcp' )
123
+ topic_path , data , origin = 'python-sample' , username = 'gcp'
124
+ )
124
125
print (future .result ())
125
126
126
127
print ('Published messages with custom attributes.' )
@@ -147,7 +148,7 @@ def publish_messages_with_futures(project_id, topic_name):
147
148
future = publisher .publish (topic_path , data = data )
148
149
print (future .result ())
149
150
150
- print (" Published messages with futures." )
151
+ print (' Published messages with futures.' )
151
152
# [END pubsub_publisher_concurrency_control]
152
153
153
154
@@ -171,17 +172,17 @@ def callback(f):
171
172
try :
172
173
print (f .result ())
173
174
futures .pop (data )
174
- except : # noqa
175
- print ("Please handle {} for {}." .format (f .exception (), data ))
175
+ except : # noqa
176
+ print ('Please handle {} for {}.' .format (f .exception (), data ))
177
+
176
178
return callback
177
179
178
180
for i in range (10 ):
179
181
data = str (i )
180
182
futures .update ({data : None })
181
183
# When you publish a message, the client returns a future.
182
184
future = publisher .publish (
183
- topic_path ,
184
- data = data .encode ("utf-8" ), # data must be a bytestring.
185
+ topic_path , data = data .encode ('utf-8' ) # data must be a bytestring.
185
186
)
186
187
futures [data ] = future
187
188
# Publish failures shall be handled in the callback function.
@@ -191,7 +192,7 @@ def callback(f):
191
192
while futures :
192
193
time .sleep (5 )
193
194
194
- print (" Published message with error handler." )
195
+ print (' Published message with error handler.' )
195
196
# [END pubsub_publish_messages_error_handler]
196
197
197
198
@@ -207,7 +208,7 @@ def publish_messages_with_batch_settings(project_id, topic_name):
207
208
# of data or one second has passed.
208
209
batch_settings = pubsub_v1 .types .BatchSettings (
209
210
max_bytes = 1024 , # One kilobyte
210
- max_latency = 1 , # One second
211
+ max_latency = 1 , # One second
211
212
)
212
213
publisher = pubsub_v1 .PublisherClient (batch_settings )
213
214
topic_path = publisher .topic_path (project_id , topic_name )
@@ -223,7 +224,65 @@ def publish_messages_with_batch_settings(project_id, topic_name):
223
224
# [END pubsub_publisher_batch_settings]
224
225
225
226
226
- if __name__ == '__main__' :
227
+ def publish_messages_with_retry_settings (project_id , topic_name ):
228
+ """Publishes messages with custom retry settings."""
229
+ # [START pubsub_publisher_retry_settings]
230
+ from google .cloud import pubsub_v1
231
+
232
+ # TODO project_id = "Your Google Cloud Project ID"
233
+ # TODO topic_name = "Your Pub/Sub topic name"
234
+
235
+ # Configure the retry settings. Defaults will be overwritten.
236
+ retry_settings = {
237
+ 'interfaces' : {
238
+ 'google.pubsub.v1.Publisher' : {
239
+ 'retry_codes' : {
240
+ 'publish' : [
241
+ 'ABORTED' ,
242
+ 'CANCELLED' ,
243
+ 'DEADLINE_EXCEEDED' ,
244
+ 'INTERNAL' ,
245
+ 'RESOURCE_EXHAUSTED' ,
246
+ 'UNAVAILABLE' ,
247
+ 'UNKNOWN' ,
248
+ ]
249
+ },
250
+ 'retry_params' : {
251
+ 'messaging' : {
252
+ 'initial_retry_delay_millis' : 150 , # default: 100
253
+ 'retry_delay_multiplier' : 1.5 , # default: 1.3
254
+ 'max_retry_delay_millis' : 65000 , # default: 60000
255
+ 'initial_rpc_timeout_millis' : 25000 , # default: 25000
256
+ 'rpc_timeout_multiplier' : 1.0 , # default: 1.0
257
+ 'max_rpc_timeout_millis' : 35000 , # default: 30000
258
+ 'total_timeout_millis' : 650000 , # default: 600000
259
+ }
260
+ },
261
+ 'methods' : {
262
+ 'Publish' : {
263
+ 'retry_codes_name' : 'publish' ,
264
+ 'retry_params_name' : 'messaging' ,
265
+ }
266
+ },
267
+ }
268
+ }
269
+ }
270
+
271
+ publisher = pubsub_v1 .PublisherClient (client_config = retry_settings )
272
+ topic_path = publisher .topic_path (project_id , topic_name )
273
+
274
+ for n in range (1 , 10 ):
275
+ data = u'Message number {}' .format (n )
276
+ # Data must be a bytestring
277
+ data = data .encode ('utf-8' )
278
+ future = publisher .publish (topic_path , data = data )
279
+ print (future .result ())
280
+
281
+ print ('Published messages with retry settings.' )
282
+ # [END pubsub_publisher_retry_settings]
283
+
284
+
285
+ if __name__ == "__main__" :
227
286
parser = argparse .ArgumentParser (
228
287
description = __doc__ ,
229
288
formatter_class = argparse .RawDescriptionHelpFormatter
@@ -233,36 +292,47 @@ def publish_messages_with_batch_settings(project_id, topic_name):
233
292
subparsers = parser .add_subparsers (dest = 'command' )
234
293
subparsers .add_parser ('list' , help = list_topics .__doc__ )
235
294
236
- create_parser = subparsers .add_parser ('create' , help = create_topic .__doc__ )
295
+ create_parser = subparsers .add_parser ('create' ,
296
+ help = create_topic .__doc__ )
237
297
create_parser .add_argument ('topic_name' )
238
298
239
- delete_parser = subparsers .add_parser ('delete' , help = delete_topic .__doc__ )
299
+ delete_parser = subparsers .add_parser ('delete' ,
300
+ help = delete_topic .__doc__ )
240
301
delete_parser .add_argument ('topic_name' )
241
302
242
- publish_parser = subparsers .add_parser (
243
- 'publish' , help = publish_messages .__doc__ )
303
+ publish_parser = subparsers .add_parser ('publish' ,
304
+ help = publish_messages .__doc__ )
244
305
publish_parser .add_argument ('topic_name' )
245
306
246
307
publish_with_custom_attributes_parser = subparsers .add_parser (
247
308
'publish-with-custom-attributes' ,
248
- help = publish_messages_with_custom_attributes .__doc__ )
309
+ help = publish_messages_with_custom_attributes .__doc__ ,
310
+ )
249
311
publish_with_custom_attributes_parser .add_argument ('topic_name' )
250
312
251
313
publish_with_futures_parser = subparsers .add_parser (
252
- 'publish-with-futures' ,
253
- help = publish_messages_with_futures . __doc__ )
314
+ 'publish-with-futures' , help = publish_messages_with_futures . __doc__
315
+ )
254
316
publish_with_futures_parser .add_argument ('topic_name' )
255
317
256
318
publish_with_error_handler_parser = subparsers .add_parser (
257
319
'publish-with-error-handler' ,
258
- help = publish_messages_with_error_handler .__doc__ )
320
+ help = publish_messages_with_error_handler .__doc__
321
+ )
259
322
publish_with_error_handler_parser .add_argument ('topic_name' )
260
323
261
324
publish_with_batch_settings_parser = subparsers .add_parser (
262
325
'publish-with-batch-settings' ,
263
- help = publish_messages_with_batch_settings .__doc__ )
326
+ help = publish_messages_with_batch_settings .__doc__
327
+ )
264
328
publish_with_batch_settings_parser .add_argument ('topic_name' )
265
329
330
+ publish_with_retry_settings_parser = subparsers .add_parser (
331
+ 'publish-with-retry-settings' ,
332
+ help = publish_messages_with_retry_settings .__doc__
333
+ )
334
+ publish_with_retry_settings_parser .add_argument ('topic_name' )
335
+
266
336
args = parser .parse_args ()
267
337
268
338
if args .command == 'list' :
@@ -274,11 +344,13 @@ def publish_messages_with_batch_settings(project_id, topic_name):
274
344
elif args .command == 'publish' :
275
345
publish_messages (args .project_id , args .topic_name )
276
346
elif args .command == 'publish-with-custom-attributes' :
277
- publish_messages_with_custom_attributes (
278
- args . project_id , args .topic_name )
347
+ publish_messages_with_custom_attributes (args . project_id ,
348
+ args .topic_name )
279
349
elif args .command == 'publish-with-futures' :
280
350
publish_messages_with_futures (args .project_id , args .topic_name )
281
351
elif args .command == 'publish-with-error-handler' :
282
352
publish_messages_with_error_handler (args .project_id , args .topic_name )
283
353
elif args .command == 'publish-with-batch-settings' :
284
354
publish_messages_with_batch_settings (args .project_id , args .topic_name )
355
+ elif args .command == 'publish-with-retry-settings' :
356
+ publish_messages_with_retry_settings (args .project_id , args .topic_name )
0 commit comments