@@ -33,6 +33,7 @@ class _TaskWorkflowManager(object):
33
33
def __init__ (
34
34
self ,
35
35
client ,
36
+ original_add_collection ,
36
37
job_id ,
37
38
tasks_to_add ,
38
39
task_add_collection_options = None ,
@@ -55,8 +56,8 @@ def __init__(
55
56
self ._pending_queue_lock = threading .Lock ()
56
57
57
58
# Variables to be used for task add_collection requests
58
- self ._client = TaskOperations (
59
- client . _client , client . config , client . _serialize , client . _deserialize )
59
+ self ._client = client
60
+ self . _original_add_collection = original_add_collection
60
61
self ._job_id = job_id
61
62
self ._task_add_collection_options = task_add_collection_options
62
63
self ._custom_headers = custom_headers
@@ -76,7 +77,8 @@ def _bulk_add_tasks(self, results_queue, chunk_tasks_to_add):
76
77
"""
77
78
78
79
try :
79
- add_collection_response = self ._client .add_collection (
80
+ add_collection_response = self ._original_add_collection (
81
+ self ._client ,
80
82
self ._job_id ,
81
83
chunk_tasks_to_add ,
82
84
self ._task_add_collection_options ,
@@ -193,104 +195,107 @@ def _handle_output(results_queue):
193
195
results .append (queue_item )
194
196
return results
195
197
196
- def patch_client (client ):
198
+
199
+ def build_new_add_collection (original_add_collection ):
200
+ def bulk_add_collection (
201
+ self ,
202
+ job_id ,
203
+ value ,
204
+ task_add_collection_options = None ,
205
+ custom_headers = None ,
206
+ raw = False ,
207
+ threads = 0 ,
208
+ ** operation_config ):
209
+ """Adds a collection of tasks to the specified job.
210
+
211
+ Note that each task must have a unique ID. The Batch service may not
212
+ return the results for each task in the same order the tasks were
213
+ submitted in this request. If the server times out or the connection is
214
+ closed during the request, the request may have been partially or fully
215
+ processed, or not at all. In such cases, the user should re-issue the
216
+ request. Note that it is up to the user to correctly handle failures
217
+ when re-issuing a request. For example, you should use the same task
218
+ IDs during a retry so that if the prior operation succeeded, the retry
219
+ will not create extra tasks unexpectedly. If the response contains any
220
+ tasks which failed to add, a client can retry the request. In a retry,
221
+ it is most efficient to resubmit only tasks that failed to add, and to
222
+ omit tasks that were successfully added on the first attempt.
223
+
224
+ :param job_id: The ID of the job to which the task collection is to be
225
+ added.
226
+ :type job_id: str
227
+ :param value: The collection of tasks to add. The total serialized
228
+ size of this collection must be less than 4MB. If it is greater than
229
+ 4MB (for example if each task has 100's of resource files or
230
+ environment variables), the request will fail with code
231
+ 'RequestBodyTooLarge' and should be retried again with fewer tasks.
232
+ :type value: list of :class:`TaskAddParameter
233
+ <azure.batch.models.TaskAddParameter>`
234
+ :param task_add_collection_options: Additional parameters for the
235
+ operation
236
+ :type task_add_collection_options: :class:`TaskAddCollectionOptions
237
+ <azure.batch.models.TaskAddCollectionOptions>`
238
+ :param dict custom_headers: headers that will be added to the request
239
+ :param bool raw: returns the direct response alongside the
240
+ deserialized response
241
+ :param int threads: number of threads to use in parallel when adding tasks. If specified
242
+ and greater than 0, will start additional threads to submit requests and wait for them to finish.
243
+ Otherwise will submit add_collection requests sequentially on main thread
244
+ :return: :class:`TaskAddCollectionResult
245
+ <azure.batch.models.TaskAddCollectionResult>` or
246
+ :class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>` if
247
+ raw=true
248
+ :rtype: :class:`TaskAddCollectionResult
249
+ <azure.batch.models.TaskAddCollectionResult>` or
250
+ :class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>`
251
+ :raises:
252
+ :class:`CreateTasksErrorException<azure.batch.custom.CreateTasksErrorException>`
253
+ """
254
+
255
+ results_queue = collections .deque () # deque operations(append/pop) are thread-safe
256
+ task_workflow_manager = _TaskWorkflowManager (
257
+ self ,
258
+ original_add_collection ,
259
+ job_id ,
260
+ value ,
261
+ task_add_collection_options ,
262
+ custom_headers ,
263
+ raw ,
264
+ ** operation_config )
265
+
266
+ # multi-threaded behavior
267
+ if threads :
268
+ if threads < 0 :
269
+ raise ValueError ("Threads must be positive or 0" )
270
+
271
+ active_threads = []
272
+ for i in range (threads ):
273
+ active_threads .append (threading .Thread (
274
+ target = task_workflow_manager .task_collection_thread_handler ,
275
+ args = (results_queue ,)))
276
+ active_threads [- 1 ].start ()
277
+ for thread in active_threads :
278
+ thread .join ()
279
+ # single-threaded behavior
280
+ else :
281
+ task_workflow_manager .task_collection_thread_handler (results_queue )
282
+
283
+ if task_workflow_manager .error :
284
+ raise task_workflow_manager .error # pylint: disable=raising-bad-type
285
+ else :
286
+ submitted_tasks = _handle_output (results_queue )
287
+ return TaskAddCollectionResult (value = submitted_tasks )
288
+ bulk_add_collection .metadata = {'url' : '/jobs/{jobId}/addtaskcollection' }
289
+ return bulk_add_collection
290
+
291
+
292
+ def patch_client ():
197
293
try :
198
294
models = sys .modules ['azure.batch.models' ]
199
295
except KeyError :
200
296
models = importlib .import_module ('azure.batch.models' )
201
297
setattr (models , 'CreateTasksErrorException' , CreateTasksErrorException )
202
298
sys .modules ['azure.batch.models' ] = models
203
- client .task .add_collection = types .MethodType (bulk_add_collection , client .task )
204
-
205
- def bulk_add_collection (
206
- client ,
207
- job_id ,
208
- value ,
209
- task_add_collection_options = None ,
210
- custom_headers = None ,
211
- raw = False ,
212
- threads = 0 ,
213
- ** operation_config ):
214
- """Adds a collection of tasks to the specified job.
215
-
216
- Note that each task must have a unique ID. The Batch service may not
217
- return the results for each task in the same order the tasks were
218
- submitted in this request. If the server times out or the connection is
219
- closed during the request, the request may have been partially or fully
220
- processed, or not at all. In such cases, the user should re-issue the
221
- request. Note that it is up to the user to correctly handle failures
222
- when re-issuing a request. For example, you should use the same task
223
- IDs during a retry so that if the prior operation succeeded, the retry
224
- will not create extra tasks unexpectedly. If the response contains any
225
- tasks which failed to add, a client can retry the request. In a retry,
226
- it is most efficient to resubmit only tasks that failed to add, and to
227
- omit tasks that were successfully added on the first attempt. The
228
- maximum lifetime of a task from addition to completion is 7 days. If a
229
- task has not completed within 7 days of being added it will be
230
- terminated by the Batch service and left in whatever state it was in at
231
- that time.
232
-
233
- :param job_id: The ID of the job to which the task collection is to be
234
- added.
235
- :type job_id: str
236
- :param value: The collection of tasks to add. The total serialized
237
- size of this collection must be less than 4MB. If it is greater than
238
- 4MB (for example if each task has 100's of resource files or
239
- environment variables), the request will fail with code
240
- 'RequestBodyTooLarge' and should be retried again with fewer tasks.
241
- :type value: list of :class:`TaskAddParameter
242
- <azure.batch.models.TaskAddParameter>`
243
- :param task_add_collection_options: Additional parameters for the
244
- operation
245
- :type task_add_collection_options: :class:`TaskAddCollectionOptions
246
- <azure.batch.models.TaskAddCollectionOptions>`
247
- :param dict custom_headers: headers that will be added to the request
248
- :param bool raw: returns the direct response alongside the
249
- deserialized response
250
- :param int threads: number of threads to use in parallel when adding tasks. If specified
251
- and greater than 0, will start additional threads to submit requests and wait for them to finish.
252
- Otherwise will submit add_collection requests sequentially on main thread
253
- :return: :class:`TaskAddCollectionResult
254
- <azure.batch.models.TaskAddCollectionResult>` or
255
- :class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>` if
256
- raw=true
257
- :rtype: :class:`TaskAddCollectionResult
258
- <azure.batch.models.TaskAddCollectionResult>` or
259
- :class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>`
260
- :raises:
261
- :class:`BatchErrorException<azure.batch.models.BatchErrorException>`
262
- """
263
299
264
- results_queue = collections .deque () # deque operations(append/pop) are thread-safe
265
- task_workflow_manager = _TaskWorkflowManager (
266
- client ,
267
- job_id ,
268
- value ,
269
- task_add_collection_options ,
270
- custom_headers ,
271
- raw ,
272
- ** operation_config )
273
-
274
- # multi-threaded behavior
275
- if threads :
276
- if threads < 0 :
277
- raise ValueError ("Threads must be positive or 0" )
278
-
279
- active_threads = []
280
- for i in range (threads ):
281
- active_threads .append (threading .Thread (
282
- target = task_workflow_manager .task_collection_thread_handler ,
283
- args = (results_queue ,)))
284
- active_threads [- 1 ].start ()
285
- for thread in active_threads :
286
- thread .join ()
287
- # single-threaded behavior
288
- else :
289
- task_workflow_manager .task_collection_thread_handler (results_queue )
290
-
291
- if task_workflow_manager .error :
292
- raise task_workflow_manager .error # pylint: disable=raising-bad-type
293
- else :
294
- submitted_tasks = _handle_output (results_queue )
295
- return TaskAddCollectionResult (value = submitted_tasks )
296
- bulk_add_collection .metadata = {'url' : '/jobs/{jobId}/addtaskcollection' }
300
+ operations_modules = importlib .import_module ('azure.batch.operations' )
301
+ operations_modules .TaskOperations .add_collection = build_new_add_collection (operations_modules .TaskOperations .add_collection )
0 commit comments