Skip to content

Commit a6748d2

Browse files
authored
Merge branch 'main' into ecs_callback
2 parents 2c696ae + 02ed72b commit a6748d2

File tree

5 files changed

+71
-41
lines changed

5 files changed

+71
-41
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.2.0
1+
2.3.0

doc/services.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ Amazon EMR
7070
.. autoclass:: stepfunctions.steps.service.EmrModifyInstanceGroupByNameStep
7171

7272
Amazon EventBridge
73-
-----------
73+
------------------
7474
.. autoclass:: stepfunctions.steps.service.EventBridgePutEventsStep
7575

7676
AWS Glue DataBrew
77-
--------------------
77+
-----------------
7878
.. autoclass:: stepfunctions.steps.service.GlueDataBrewStartJobRunStep
7979

8080
Amazon SNS

src/stepfunctions/steps/sagemaker.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non
7070
:class:`sagemaker.amazon.amazon_estimator.RecordSet` objects,
7171
where each instance is a different channel of training data.
7272
hyperparameters: Parameters used for training.
73+
7374
* (dict, optional) - Hyperparameters supplied will be merged with the Hyperparameters specified in the estimator.
7475
If there are duplicate entries, the value provided through this property will be used. (Default: Hyperparameters specified in the estimator.)
7576
* (Placeholder, optional) - The TrainingStep will use the hyperparameters specified by the Placeholder's value instead of the hyperparameters specified in the estimator.
@@ -79,8 +80,8 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non
7980
tags (list[dict] or Placeholder, optional): `List of tags <https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html>`_ to associate with the resource.
8081
output_data_config_path (str or Placeholder, optional): S3 location for saving the training result (model
8182
artifacts and output files). If specified, it overrides the `output_path` property of `estimator`.
82-
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateTrainingJob<https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTrainingJob.html>`_. (Default: None)
83-
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders<https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
83+
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateTrainingJob <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTrainingJob.html>`_. (Default: None)
84+
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders <https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
8485
"""
8586
self.estimator = estimator
8687
self.job_name = job_name
@@ -224,8 +225,8 @@ def __init__(self, state_id, transformer, job_name, model_name, data, data_type=
224225
input_filter (str or Placeholder): A JSONPath to select a portion of the input to pass to the algorithm container for inference. If you omit the field, it gets the value ‘$’, representing the entire input. For CSV data, each row is taken as a JSON array, so only index-based JSONPaths can be applied, e.g. $[0], $[1:]. CSV data should follow the RFC format. See Supported JSONPath Operators for a table of supported JSONPath operators. For more information, see the SageMaker API documentation for CreateTransformJob. Some examples: “$[1:]”, “$.features” (default: None).
225226
output_filter (str or Placeholder): A JSONPath to select a portion of the joined/original output to return as the output. For more information, see the SageMaker API documentation for CreateTransformJob. Some examples: “$[1:]”, “$.prediction” (default: None).
226227
join_source (str or Placeholder): The source of data to be joined to the transform output. It can be set to ‘Input’ meaning the entire input record will be joined to the inference result. You can use OutputFilter to select the useful portion before uploading to S3. (default: None). Valid values: Input, None.
227-
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateTransformJob<https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html>`_.
228-
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders<https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
228+
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateTransformJob <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html>`_.
229+
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders <https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
229230
230231
"""
231232
if wait_for_completion:
@@ -303,8 +304,8 @@ def __init__(self, state_id, model, model_name=None, instance_type=None, tags=No
303304
model_name (str or Placeholder, optional): Specify a model name, this is required for creating the model. We recommend to use :py:class:`~stepfunctions.inputs.ExecutionInput` placeholder collection to pass the value dynamically in each execution.
304305
instance_type (str, optional): The EC2 instance type to deploy this Model to. For example, 'ml.p2.xlarge'.
305306
tags (list[dict] or Placeholders, optional): `List of tags <https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html>`_ to associate with the resource.
306-
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateModel<https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateModel.html>`_. (Default: None)
307-
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders<https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
307+
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateModel <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateModel.html>`_. (Default: None)
308+
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders <https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
308309
"""
309310
if isinstance(model, FrameworkModel):
310311
model_parameters = model_config(model=model, instance_type=instance_type, role=model.role, image_uri=model.image_uri)
@@ -466,8 +467,8 @@ def __init__(self, state_id, tuner, job_name, data, wait_for_completion=True, ta
466467
where each instance is a different channel of training data.
467468
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the tuning job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the tuning job and proceed to the next step. (default: True)
468469
tags (list[dict] or Placeholder, optional): `List of tags <https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html>`_ to associate with the resource.
469-
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateHyperParameterTuningJob<https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateHyperParameterTuningJob.html>`_.
470-
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders<https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
470+
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateHyperParameterTuningJob <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateHyperParameterTuningJob.html>`_.
471+
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders <https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
471472
472473
"""
473474
if wait_for_completion:
@@ -534,8 +535,8 @@ def __init__(self, state_id, processor, job_name, inputs=None, outputs=None, exp
534535
The KmsKeyId is applied to all outputs.
535536
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait for the processing job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the processing job and proceed to the next step. (default: True)
536537
tags (list[dict] or Placeholder, optional): `List of tags <https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html>`_ to associate with the resource.
537-
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateProcessingJob<https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateProcessingJob.html>`_.
538-
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders<https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
538+
parameters(dict, optional): The value of this field is merged with other arguments to become the request payload for SageMaker `CreateProcessingJob <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateProcessingJob.html>`_.
539+
You can use `parameters` to override the value provided by other arguments and specify any field's value dynamically using `Placeholders <https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=placeholder#stepfunctions.inputs.Placeholder>`_.
539540
540541
"""
541542
if wait_for_completion:

src/stepfunctions/steps/service.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -550,10 +550,14 @@ class GlueDataBrewStartJobRunStep(Task):
550550
Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html>`_ for more details.
551551
"""
552552

553-
def __init__(self, state_id, wait_for_completion=True, **kwargs):
553+
def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompletion, **kwargs):
554554
"""
555555
Args:
556556
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
557+
integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. Supported integration patterns (default: WaitForCompletion):
558+
559+
* WaitForCompletion: Wait for the Databrew job to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync>`_ for more details.)
560+
* CallAndContinue: Call StartJobRun and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default>`_ for more details.)
557561
comment (str, optional): Human-readable comment or description. (default: None)
558562
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
559563
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
@@ -563,23 +567,18 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
563567
parameters (dict, optional): The value of this field becomes the effective input for the state.
564568
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
565569
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
566-
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
567570
"""
568-
if wait_for_completion:
569-
"""
570-
Example resource arn: arn:aws:states:::databrew:startJobRun.sync
571-
"""
572-
573-
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
574-
GlueDataBrewApi.StartJobRun,
575-
IntegrationPattern.WaitForCompletion)
576-
else:
577-
"""
578-
Example resource arn: arn:aws:states:::databrew:startJobRun
579-
"""
571+
supported_integ_patterns = [IntegrationPattern.WaitForCompletion, IntegrationPattern.CallAndContinue]
580572

581-
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
582-
GlueDataBrewApi.StartJobRun)
573+
is_integration_pattern_valid(integration_pattern, supported_integ_patterns)
574+
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
575+
GlueDataBrewApi.StartJobRun,
576+
integration_pattern)
577+
"""
578+
Example resource arns:
579+
- CallAndContinue: arn: arn:aws:states:::databrew:startJobRun
580+
- WaitForCompletion: arn: arn:aws:states:::databrew:startJobRun.sync
581+
"""
583582

584583
super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs)
585584

@@ -904,18 +903,18 @@ def __init__(self, state_id, **kwargs):
904903
class StepFunctionsStartExecutionStep(Task):
905904

906905
"""
907-
Creates a Task state that starts an execution of a state machine. See `Manage AWS Step Functions Executions as an Integrated Service <https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html`_ for more details.
906+
Creates a Task state that starts an execution of a state machine. See `Manage AWS Step Functions Executions as an Integrated Service <https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html>`_ for more details.
908907
"""
909908

910909
def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompletion, **kwargs):
911910
"""
912911
Args:
913912
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
914-
integration_pattern (IntegrationPattern, optional): Service integration pattern used to call the integrated service. (default: WaitForCompletion)
915-
Supported integration patterns:
916-
WaitForCompletion: Wait for the state machine execution to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync`_ for more details.)
917-
WaitForTaskToken: Wait for the state machine execution to return a task token before progressing to the next state (See `Wait for a Callback with the Task Token <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token`_ for more details.)
918-
CallAndContinue: Call StartExecution and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default`_ for more details.)
913+
integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. Supported integration patterns (default: WaitForCompletion):
914+
915+
* WaitForCompletion: Wait for the state machine execution to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync>`_ for more details.)
916+
* WaitForTaskToken: Wait for the state machine execution to return a task token before progressing to the next state (See `Wait for a Callback with the Task Token <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token>`_ for more details.)
917+
* CallAndContinue: Call StartExecution and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default>`_ for more details.)
919918
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
920919
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
921920
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.

0 commit comments

Comments
 (0)