Skip to content

Commit a03b9c3

Browse files
committed
improve service integration support
1 parent 2a104d3 commit a03b9c3

File tree

8 files changed

+375
-282
lines changed

8 files changed

+375
-282
lines changed

src/stepfunctions/exceptions.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,9 @@ class MissingRequiredParameter(Exception):
2121
pass
2222

2323

24+
class ForbiddenValueParameter(Exception):
25+
pass
26+
27+
2428
class DuplicateStatesInChain(Exception):
25-
pass
29+
pass

src/stepfunctions/steps/compute.py

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
# permissions and limitations under the License.
1313
from __future__ import absolute_import
1414

15+
from stepfunctions.steps.states import IntegrationPattern
1516
from stepfunctions.steps.states import Task
16-
from stepfunctions.steps.fields import Field
1717

1818

1919
class LambdaStep(Task):
@@ -22,11 +22,11 @@ class LambdaStep(Task):
2222
Creates a Task state to invoke an AWS Lambda function. See `Invoke Lambda with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html>`_ for more details.
2323
"""
2424

25-
def __init__(self, state_id, wait_for_callback=False, **kwargs):
25+
def __init__(self, state_id, integration_pattern=IntegrationPattern.RequestResponse, **kwargs):
2626
"""
2727
Args:
2828
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
29-
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
29+
integration_pattern(stepfunctions.states.IntegrationPattern, optional): Enum value set to RunAJob if the task should wait to complete before proceeding to the next step in the workflow, set to WaitForCallback if the Task state should wait for callback to resume the operation or set to RequestResponse if the Task should wait for HTTP response (default: RequestResponse)
3030
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
3131
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
3232
comment (str, optional): Human-readable comment or description. (default: None)
@@ -35,12 +35,11 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
3535
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
3636
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
3737
"""
38-
if wait_for_callback:
39-
kwargs[Field.Resource.value] = 'arn:aws:states:::lambda:invoke.waitForTaskToken'
40-
else:
41-
kwargs[Field.Resource.value] = 'arn:aws:states:::lambda:invoke'
42-
43-
super(LambdaStep, self).__init__(state_id, **kwargs)
38+
self._valid_patterns = [IntegrationPattern.RequestResponse, IntegrationPattern.WaitForCallback]
39+
self._integration_pattern = integration_pattern
40+
action = "lambda:invoke"
41+
step_name = "Lambda"
42+
super(LambdaStep, self).__init__(state_id, action, step_name, **kwargs)
4443

4544

4645
class GlueStartJobRunStep(Task):
@@ -49,11 +48,11 @@ class GlueStartJobRunStep(Task):
4948
Creates a Task state to run an AWS Glue job. See `Manage AWS Glue Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html>`_ for more details.
5049
"""
5150

52-
def __init__(self, state_id, wait_for_completion=True, **kwargs):
51+
def __init__(self, state_id, integration_pattern=IntegrationPattern.RunAJob, **kwargs):
5352
"""
5453
Args:
5554
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
56-
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the glue job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the glue job and proceed to the next step. (default: True)
55+
integration_pattern(stepfunctions.states.IntegrationPattern, optional): Enum value set to RunAJob if the task should wait to complete before proceeding to the next step in the workflow, set to WaitForCallback if the Task state should wait for callback to resume the operation or set to RequestResponse if the Task should wait for HTTP response (default: RequestResponse)
5756
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
5857
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
5958
comment (str, optional): Human-readable comment or description. (default: None)
@@ -62,12 +61,9 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
6261
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
6362
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
6463
"""
65-
if wait_for_completion:
66-
kwargs[Field.Resource.value] = 'arn:aws:states:::glue:startJobRun.sync'
67-
else:
68-
kwargs[Field.Resource.value] = 'arn:aws:states:::glue:startJobRun'
69-
70-
super(GlueStartJobRunStep, self).__init__(state_id, **kwargs)
64+
self._valid_patterns = [IntegrationPattern.RequestResponse, IntegrationPattern.RunAJob]
65+
self._integration_pattern = integration_pattern
66+
super(GlueStartJobRunStep, self).__init__(state_id, "glue:startJobRun", "AWS Glue", **kwargs)
7167

7268

7369
class BatchSubmitJobStep(Task):
@@ -76,11 +72,11 @@ class BatchSubmitJobStep(Task):
7672
Creates a Task State to start an AWS Batch job. See `Manage AWS Batch with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-batch.html>`_ for more details.
7773
"""
7874

79-
def __init__(self, state_id, wait_for_completion=True, **kwargs):
75+
def __init__(self, state_id, integration_pattern=IntegrationPattern.RunAJob, **kwargs):
8076
"""
8177
Args:
8278
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
83-
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the batch job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the batch job and proceed to the next step. (default: True)
79+
integration_pattern(stepfunctions.states.IntegrationPattern, optional): Enum value set to RunAJob if the task should wait to complete before proceeding to the next step in the workflow, set to WaitForCallback if the Task state should wait for callback to resume the operation or set to RequestResponse if the Task should wait for HTTP response (default: RequestResponse)
8480
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
8581
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
8682
comment (str, optional): Human-readable comment or description. (default: None)
@@ -89,12 +85,9 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
8985
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
9086
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
9187
"""
92-
if wait_for_completion:
93-
kwargs[Field.Resource.value] = 'arn:aws:states:::batch:submitJob.sync'
94-
else:
95-
kwargs[Field.Resource.value] = 'arn:aws:states:::batch:submitJob'
96-
97-
super(BatchSubmitJobStep, self).__init__(state_id, **kwargs)
88+
self._valid_patterns = [IntegrationPattern.RequestResponse, IntegrationPattern.RunAJob]
89+
self._integration_pattern = integration_pattern
90+
super(BatchSubmitJobStep, self).__init__(state_id, "batch:submitJob", "AWS Batch", **kwargs)
9891

9992

10093
class EcsRunTaskStep(Task):
@@ -103,22 +96,18 @@ class EcsRunTaskStep(Task):
10396
Creates a Task State to run Amazon ECS or Fargate Tasks. See `Manage Amazon ECS or Fargate Tasks with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html>`_ for more details.
10497
"""
10598

106-
def __init__(self, state_id, wait_for_completion=True, **kwargs):
99+
def __init__(self, state_id, integration_pattern=IntegrationPattern.RequestResponse, **kwargs):
107100
"""
108101
Args:
109102
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
110-
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the ecs job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the ecs job and proceed to the next step. (default: True)
111-
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
103+
integration_pattern(stepfunctions.states.IntegrationPattern, optional): Enum value set to RunAJob if the task should wait to complete before proceeding to the next step in the workflow, set to WaitForCallback if the Task state should wait for callback to resume the operation or set to RequestResponse if the Task should wait for HTTP response (default: RequestResponse) timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
112104
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
113105
comment (str, optional): Human-readable comment or description. (default: None)
114106
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
115107
parameters (dict, optional): The value of this field becomes the effective input for the state.
116108
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
117109
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
118110
"""
119-
if wait_for_completion:
120-
kwargs[Field.Resource.value] = 'arn:aws:states:::ecs:runTask.sync'
121-
else:
122-
kwargs[Field.Resource.value] = 'arn:aws:states:::ecs:runTask'
123-
124-
super(EcsRunTaskStep, self).__init__(state_id, **kwargs)
111+
self._valid_patterns = [IntegrationPattern.RequestResponse, IntegrationPattern.RunAJob, IntegrationPattern.WaitForCallback]
112+
self._integration_pattern = integration_pattern
113+
super(EcsRunTaskStep, self).__init__(state_id, "ecs:runTask", "Amazon ECS/AWS Fargate", **kwargs)

0 commit comments

Comments
 (0)