Skip to content

Improve AWS service integration support (pattern) #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
sagemaker>=1.71.0
sagemaker>=1.71.0,<2.0.0
boto3>=1.9.213
pyyaml
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def read_version():

# Declare minimal set for installation
required_packages = [
"sagemaker>=1.42.8",
"sagemaker>=1.42.8,<2.0.0",
"boto3>=1.9.213",
"pyyaml"
]
Expand Down
6 changes: 5 additions & 1 deletion src/stepfunctions/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ class MissingRequiredParameter(Exception):
pass


class ForbiddenValueParameter(Exception):
pass


class DuplicateStatesInChain(Exception):
pass
pass
57 changes: 23 additions & 34 deletions src/stepfunctions/steps/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# permissions and limitations under the License.
from __future__ import absolute_import

from stepfunctions.steps.states import IntegrationPattern
from stepfunctions.steps.states import Task
from stepfunctions.steps.fields import Field


class LambdaStep(Task):
Expand All @@ -22,11 +22,11 @@ class LambdaStep(Task):
Creates a Task state to invoke an AWS Lambda function. See `Invoke Lambda with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_callback=False, **kwargs):
def __init__(self, state_id, integration_pattern=IntegrationPattern.RequestResponse, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
integration_pattern(stepfunctions.states.IntegrationPattern, optional): Enum value set to RunAJob if the task should wait to complete before proceeding to the next step in the workflow, set to WaitForCallback if the Task state should wait for callback to resume the operation or set to RequestResponse if the Task should wait for HTTP response (default: RequestResponse)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
comment (str, optional): Human-readable comment or description. (default: None)
Expand All @@ -35,12 +35,11 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_callback:
kwargs[Field.Resource.value] = 'arn:aws:states:::lambda:invoke.waitForTaskToken'
else:
kwargs[Field.Resource.value] = 'arn:aws:states:::lambda:invoke'

super(LambdaStep, self).__init__(state_id, **kwargs)
self._valid_patterns = [IntegrationPattern.RequestResponse, IntegrationPattern.WaitForCallback]
self._integration_pattern = integration_pattern
action = "lambda:invoke"
step_name = "Lambda"
super(LambdaStep, self).__init__(state_id, action, step_name, **kwargs)


class GlueStartJobRunStep(Task):
Expand All @@ -49,11 +48,11 @@ class GlueStartJobRunStep(Task):
Creates a Task state to run an AWS Glue job. See `Manage AWS Glue Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, **kwargs):
def __init__(self, state_id, integration_pattern=IntegrationPattern.RunAJob, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the glue job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the glue job and proceed to the next step. (default: True)
integration_pattern(stepfunctions.states.IntegrationPattern, optional): Enum value set to RunAJob if the task should wait to complete before proceeding to the next step in the workflow, set to WaitForCallback if the Task state should wait for callback to resume the operation or set to RequestResponse if the Task should wait for HTTP response (default: RequestResponse)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
comment (str, optional): Human-readable comment or description. (default: None)
Expand All @@ -62,12 +61,9 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
kwargs[Field.Resource.value] = 'arn:aws:states:::glue:startJobRun.sync'
else:
kwargs[Field.Resource.value] = 'arn:aws:states:::glue:startJobRun'

super(GlueStartJobRunStep, self).__init__(state_id, **kwargs)
self._valid_patterns = [IntegrationPattern.RequestResponse, IntegrationPattern.RunAJob]
self._integration_pattern = integration_pattern
super(GlueStartJobRunStep, self).__init__(state_id, "glue:startJobRun", "AWS Glue", **kwargs)


class BatchSubmitJobStep(Task):
Expand All @@ -76,11 +72,11 @@ class BatchSubmitJobStep(Task):
Creates a Task State to start an AWS Batch job. See `Manage AWS Batch with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-batch.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, **kwargs):
def __init__(self, state_id, integration_pattern=IntegrationPattern.RunAJob, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the batch job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the batch job and proceed to the next step. (default: True)
integration_pattern(stepfunctions.states.IntegrationPattern, optional): Enum value set to RunAJob if the task should wait to complete before proceeding to the next step in the workflow, set to WaitForCallback if the Task state should wait for callback to resume the operation or set to RequestResponse if the Task should wait for HTTP response (default: RequestResponse)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
comment (str, optional): Human-readable comment or description. (default: None)
Expand All @@ -89,12 +85,9 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
kwargs[Field.Resource.value] = 'arn:aws:states:::batch:submitJob.sync'
else:
kwargs[Field.Resource.value] = 'arn:aws:states:::batch:submitJob'

super(BatchSubmitJobStep, self).__init__(state_id, **kwargs)
self._valid_patterns = [IntegrationPattern.RequestResponse, IntegrationPattern.RunAJob]
self._integration_pattern = integration_pattern
super(BatchSubmitJobStep, self).__init__(state_id, "batch:submitJob", "AWS Batch", **kwargs)


class EcsRunTaskStep(Task):
Expand All @@ -103,22 +96,18 @@ class EcsRunTaskStep(Task):
Creates a Task State to run Amazon ECS or Fargate Tasks. See `Manage Amazon ECS or Fargate Tasks with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, **kwargs):
def __init__(self, state_id, integration_pattern=IntegrationPattern.RequestResponse, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the ecs job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the ecs job and proceed to the next step. (default: True)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
integration_pattern(stepfunctions.states.IntegrationPattern, optional): Enum value set to RunAJob if the task should wait to complete before proceeding to the next step in the workflow, set to WaitForCallback if the Task state should wait for callback to resume the operation or set to RequestResponse if the Task should wait for HTTP response (default: RequestResponse) timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
kwargs[Field.Resource.value] = 'arn:aws:states:::ecs:runTask.sync'
else:
kwargs[Field.Resource.value] = 'arn:aws:states:::ecs:runTask'

super(EcsRunTaskStep, self).__init__(state_id, **kwargs)
self._valid_patterns = [IntegrationPattern.RequestResponse, IntegrationPattern.RunAJob, IntegrationPattern.WaitForCallback]
self._integration_pattern = integration_pattern
super(EcsRunTaskStep, self).__init__(state_id, "ecs:runTask", "Amazon ECS/AWS Fargate", **kwargs)
Loading