Skip to content

Commit bf3478b

Browse files
bgkleinlmazuel
authored andcommitted
Update Batch patch structure and documentation - no functionality change (#3421)
1 parent 9195624 commit bf3478b

File tree

2 files changed

+105
-99
lines changed

2 files changed

+105
-99
lines changed

azure-batch/azure/batch/batch_service_client.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -113,5 +113,6 @@ def __init__(
113113
self._client, self.config, self._serialize, self._deserialize)
114114
self.compute_node = ComputeNodeOperations(
115115
self._client, self.config, self._serialize, self._deserialize)
116-
117-
patch_client(self)
116+
117+
118+
patch_client()

azure-batch/azure/batch/custom/patch.py

+102-97
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class _TaskWorkflowManager(object):
3333
def __init__(
3434
self,
3535
client,
36+
original_add_collection,
3637
job_id,
3738
tasks_to_add,
3839
task_add_collection_options=None,
@@ -55,8 +56,8 @@ def __init__(
5556
self._pending_queue_lock = threading.Lock()
5657

5758
# Variables to be used for task add_collection requests
58-
self._client = TaskOperations(
59-
client._client, client.config, client._serialize, client._deserialize)
59+
self._client = client
60+
self._original_add_collection = original_add_collection
6061
self._job_id = job_id
6162
self._task_add_collection_options = task_add_collection_options
6263
self._custom_headers = custom_headers
@@ -76,7 +77,8 @@ def _bulk_add_tasks(self, results_queue, chunk_tasks_to_add):
7677
"""
7778

7879
try:
79-
add_collection_response = self._client.add_collection(
80+
add_collection_response = self._original_add_collection(
81+
self._client,
8082
self._job_id,
8183
chunk_tasks_to_add,
8284
self._task_add_collection_options,
@@ -193,104 +195,107 @@ def _handle_output(results_queue):
193195
results.append(queue_item)
194196
return results
195197

196-
def patch_client(client):
198+
199+
def build_new_add_collection(original_add_collection):
200+
def bulk_add_collection(
201+
self,
202+
job_id,
203+
value,
204+
task_add_collection_options=None,
205+
custom_headers=None,
206+
raw=False,
207+
threads=0,
208+
**operation_config):
209+
"""Adds a collection of tasks to the specified job.
210+
211+
Note that each task must have a unique ID. The Batch service may not
212+
return the results for each task in the same order the tasks were
213+
submitted in this request. If the server times out or the connection is
214+
closed during the request, the request may have been partially or fully
215+
processed, or not at all. In such cases, the user should re-issue the
216+
request. Note that it is up to the user to correctly handle failures
217+
when re-issuing a request. For example, you should use the same task
218+
IDs during a retry so that if the prior operation succeeded, the retry
219+
will not create extra tasks unexpectedly. If the response contains any
220+
tasks which failed to add, a client can retry the request. In a retry,
221+
it is most efficient to resubmit only tasks that failed to add, and to
222+
omit tasks that were successfully added on the first attempt.
223+
224+
:param job_id: The ID of the job to which the task collection is to be
225+
added.
226+
:type job_id: str
227+
:param value: The collection of tasks to add. The total serialized
228+
size of this collection must be less than 4MB. If it is greater than
229+
4MB (for example if each task has 100's of resource files or
230+
environment variables), the request will fail with code
231+
'RequestBodyTooLarge' and should be retried again with fewer tasks.
232+
:type value: list of :class:`TaskAddParameter
233+
<azure.batch.models.TaskAddParameter>`
234+
:param task_add_collection_options: Additional parameters for the
235+
operation
236+
:type task_add_collection_options: :class:`TaskAddCollectionOptions
237+
<azure.batch.models.TaskAddCollectionOptions>`
238+
:param dict custom_headers: headers that will be added to the request
239+
:param bool raw: returns the direct response alongside the
240+
deserialized response
241+
:param int threads: number of threads to use in parallel when adding tasks. If specified
242+
and greater than 0, will start additional threads to submit requests and wait for them to finish.
243+
Otherwise will submit add_collection requests sequentially on main thread
244+
:return: :class:`TaskAddCollectionResult
245+
<azure.batch.models.TaskAddCollectionResult>` or
246+
:class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>` if
247+
raw=true
248+
:rtype: :class:`TaskAddCollectionResult
249+
<azure.batch.models.TaskAddCollectionResult>` or
250+
:class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>`
251+
:raises:
252+
:class:`CreateTasksErrorException<azure.batch.custom.CreateTasksErrorException>`
253+
"""
254+
255+
results_queue = collections.deque() # deque operations(append/pop) are thread-safe
256+
task_workflow_manager = _TaskWorkflowManager(
257+
self,
258+
original_add_collection,
259+
job_id,
260+
value,
261+
task_add_collection_options,
262+
custom_headers,
263+
raw,
264+
**operation_config)
265+
266+
# multi-threaded behavior
267+
if threads:
268+
if threads < 0:
269+
raise ValueError("Threads must be positive or 0")
270+
271+
active_threads = []
272+
for i in range(threads):
273+
active_threads.append(threading.Thread(
274+
target=task_workflow_manager.task_collection_thread_handler,
275+
args=(results_queue,)))
276+
active_threads[-1].start()
277+
for thread in active_threads:
278+
thread.join()
279+
# single-threaded behavior
280+
else:
281+
task_workflow_manager.task_collection_thread_handler(results_queue)
282+
283+
if task_workflow_manager.error:
284+
raise task_workflow_manager.error # pylint: disable=raising-bad-type
285+
else:
286+
submitted_tasks = _handle_output(results_queue)
287+
return TaskAddCollectionResult(value=submitted_tasks)
288+
bulk_add_collection.metadata = {'url': '/jobs/{jobId}/addtaskcollection'}
289+
return bulk_add_collection
290+
291+
292+
def patch_client():
197293
try:
198294
models = sys.modules['azure.batch.models']
199295
except KeyError:
200296
models = importlib.import_module('azure.batch.models')
201297
setattr(models, 'CreateTasksErrorException', CreateTasksErrorException)
202298
sys.modules['azure.batch.models'] = models
203-
client.task.add_collection = types.MethodType(bulk_add_collection, client.task)
204-
205-
def bulk_add_collection(
206-
client,
207-
job_id,
208-
value,
209-
task_add_collection_options=None,
210-
custom_headers=None,
211-
raw=False,
212-
threads=0,
213-
**operation_config):
214-
"""Adds a collection of tasks to the specified job.
215-
216-
Note that each task must have a unique ID. The Batch service may not
217-
return the results for each task in the same order the tasks were
218-
submitted in this request. If the server times out or the connection is
219-
closed during the request, the request may have been partially or fully
220-
processed, or not at all. In such cases, the user should re-issue the
221-
request. Note that it is up to the user to correctly handle failures
222-
when re-issuing a request. For example, you should use the same task
223-
IDs during a retry so that if the prior operation succeeded, the retry
224-
will not create extra tasks unexpectedly. If the response contains any
225-
tasks which failed to add, a client can retry the request. In a retry,
226-
it is most efficient to resubmit only tasks that failed to add, and to
227-
omit tasks that were successfully added on the first attempt. The
228-
maximum lifetime of a task from addition to completion is 7 days. If a
229-
task has not completed within 7 days of being added it will be
230-
terminated by the Batch service and left in whatever state it was in at
231-
that time.
232-
233-
:param job_id: The ID of the job to which the task collection is to be
234-
added.
235-
:type job_id: str
236-
:param value: The collection of tasks to add. The total serialized
237-
size of this collection must be less than 4MB. If it is greater than
238-
4MB (for example if each task has 100's of resource files or
239-
environment variables), the request will fail with code
240-
'RequestBodyTooLarge' and should be retried again with fewer tasks.
241-
:type value: list of :class:`TaskAddParameter
242-
<azure.batch.models.TaskAddParameter>`
243-
:param task_add_collection_options: Additional parameters for the
244-
operation
245-
:type task_add_collection_options: :class:`TaskAddCollectionOptions
246-
<azure.batch.models.TaskAddCollectionOptions>`
247-
:param dict custom_headers: headers that will be added to the request
248-
:param bool raw: returns the direct response alongside the
249-
deserialized response
250-
:param int threads: number of threads to use in parallel when adding tasks. If specified
251-
and greater than 0, will start additional threads to submit requests and wait for them to finish.
252-
Otherwise will submit add_collection requests sequentially on main thread
253-
:return: :class:`TaskAddCollectionResult
254-
<azure.batch.models.TaskAddCollectionResult>` or
255-
:class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>` if
256-
raw=true
257-
:rtype: :class:`TaskAddCollectionResult
258-
<azure.batch.models.TaskAddCollectionResult>` or
259-
:class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>`
260-
:raises:
261-
:class:`BatchErrorException<azure.batch.models.BatchErrorException>`
262-
"""
263299

264-
results_queue = collections.deque() # deque operations(append/pop) are thread-safe
265-
task_workflow_manager = _TaskWorkflowManager(
266-
client,
267-
job_id,
268-
value,
269-
task_add_collection_options,
270-
custom_headers,
271-
raw,
272-
**operation_config)
273-
274-
# multi-threaded behavior
275-
if threads:
276-
if threads < 0:
277-
raise ValueError("Threads must be positive or 0")
278-
279-
active_threads = []
280-
for i in range(threads):
281-
active_threads.append(threading.Thread(
282-
target=task_workflow_manager.task_collection_thread_handler,
283-
args=(results_queue,)))
284-
active_threads[-1].start()
285-
for thread in active_threads:
286-
thread.join()
287-
# single-threaded behavior
288-
else:
289-
task_workflow_manager.task_collection_thread_handler(results_queue)
290-
291-
if task_workflow_manager.error:
292-
raise task_workflow_manager.error # pylint: disable=raising-bad-type
293-
else:
294-
submitted_tasks = _handle_output(results_queue)
295-
return TaskAddCollectionResult(value=submitted_tasks)
296-
bulk_add_collection.metadata = {'url': '/jobs/{jobId}/addtaskcollection'}
300+
operations_modules = importlib.import_module('azure.batch.operations')
301+
operations_modules.TaskOperations.add_collection = build_new_add_collection(operations_modules.TaskOperations.add_collection)

0 commit comments

Comments
 (0)