Skip to content

Commit 9df0b8d

Browse files
committed
Added tests and updated placeholders documentation
1 parent 2758879 commit 9df0b8d

File tree

4 files changed

+217
-96
lines changed

4 files changed

+217
-96
lines changed

doc/placeholders.rst

+26-6
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,32 @@
11
Placeholders
22
=============
33

4+
45
Once defined, a workflow is static unless you update it explicitly. But, you can pass
56
input to workflow executions. You can have dynamic values
67
that you use in the **parameters** or **result_selector** fields of the steps in your workflow. For this,
78
the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you
8-
create your workflow. There are 3 mechanisms for passing dynamic values in a workflow.
9+
create your workflow.
10+
11+
.. autoclass:: stepfunctions.inputs.Placeholder
12+
13+
There are 3 mechanisms for passing dynamic values in a workflow:
14+
15+
- `Execution Input <#execution-input>`__
916

17+
- `Step Input <#step-input>`__
18+
19+
- `Step Result <#step-result>`__
20+
21+
Execution Input
22+
---------------
1023
The first mechanism is a global input to the workflow execution. This input is
1124
accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput`
1225
to define the schema for this input, and to access the values in your workflow.
1326

27+
.. autoclass:: stepfunctions.inputs.ExecutionInput
28+
:inherited-members:
29+
1430
.. code-block:: python
1531
1632
# Create an instance of ExecutionInput class, and define a schema. Defining
@@ -50,13 +66,19 @@ to define the schema for this input, and to access the values in your workflow.
5066
5167
workflow.execute(inputs={'myDynamicInput': "WorldHello"})
5268
69+
70+
Step Input
71+
----------
5372
The second mechanism is for passing dynamic values from one step to the next
5473
step. The output of one step becomes the input of the next step.
5574
The SDK provides the :py:meth:`stepfunctions.inputs.StepInput` class for this.
5675

5776
By default, each step has an output method :py:meth:`stepfunctions.steps.states.State.output`
5877
that returns the placeholder output for that step.
5978

79+
.. autoclass:: stepfunctions.inputs.StepInput
80+
:inherited-members:
81+
6082
.. code-block:: python
6183
6284
lambda_state_first = LambdaStep(
@@ -81,6 +103,9 @@ that returns the placeholder output for that step.
81103
82104
definition = Chain([lambda_state_first, lambda_state_second])
83105
106+
107+
Step Result
108+
-----------
84109
The third mechanism is a placeholder for a step's result. The result of a step can be modified
85110
with the **result_selector** field to replace the step's result.
86111

@@ -100,13 +125,8 @@ with the **result_selector** field to replace the step's result.
100125
}
101126
)
102127
103-
.. autoclass:: stepfunctions.inputs.Placeholder
104128
105-
.. autoclass:: stepfunctions.inputs.ExecutionInput
106-
:inherited-members:
107129
108-
.. autoclass:: stepfunctions.inputs.StepInput
109-
:inherited-members:
110130
111131
.. autoclass:: stepfunctions.inputs.StepResult
112132
:inherited-members:

src/stepfunctions/steps/service.py

+8
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
250250
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
251251
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
252252
parameters (dict, optional): The value of this field becomes the effective input for the state.
253+
result_selector (dict, optional): The value of this field becomes the effective result of the state.
253254
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
254255
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
255256
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
@@ -289,6 +290,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
289290
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
290291
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
291292
parameters (dict, optional): The value of this field becomes the effective input for the state.
293+
result_selector (dict, optional): The value of this field becomes the effective result of the state.
292294
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
293295
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
294296
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
@@ -328,6 +330,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
328330
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
329331
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
330332
parameters (dict, optional): The value of this field becomes the effective input for the state.
333+
result_selector (dict, optional): The value of this field becomes the effective result of the state.
331334
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
332335
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
333336
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
@@ -367,6 +370,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
367370
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
368371
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
369372
parameters (dict, optional): The value of this field becomes the effective input for the state.
373+
result_selector (dict, optional): The value of this field becomes the effective result of the state.
370374
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
371375
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
372376
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
@@ -406,6 +410,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
406410
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
407411
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
408412
parameters (dict, optional): The value of this field becomes the effective input for the state.
413+
result_selector (dict, optional): The value of this field becomes the effective result of the state.
409414
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
410415
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
411416
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
@@ -445,6 +450,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
445450
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
446451
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
447452
parameters (dict, optional): The value of this field becomes the effective input for the state.
453+
result_selector (dict, optional): The value of this field becomes the effective result of the state.
448454
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
449455
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
450456
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
@@ -484,6 +490,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
484490
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
485491
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
486492
parameters (dict, optional): The value of this field becomes the effective input for the state.
493+
result_selector (dict, optional): The value of this field becomes the effective result of the state.
487494
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
488495
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
489496
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
@@ -523,6 +530,7 @@ def __init__(self, state_id, **kwargs):
523530
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
524531
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
525532
parameters (dict, optional): The value of this field becomes the effective input for the state.
533+
result_selector (dict, optional): The value of this field becomes the effective result of the state.
526534
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
527535
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
528536
"""

tests/unit/test_compute_steps.py

+15-4
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,24 @@
1616
import boto3
1717

1818
from unittest.mock import patch
19+
20+
from stepfunctions.inputs import StepResult
1921
from stepfunctions.steps.compute import LambdaStep, GlueStartJobRunStep, BatchSubmitJobStep, EcsRunTaskStep
2022

2123

24+
STEP_RESULT = StepResult()
25+
RESULT_SELECTOR = {
26+
"OutputA": STEP_RESULT['A']
27+
}
28+
2229
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
2330
def test_lambda_step_creation():
24-
step = LambdaStep('Echo')
31+
step = LambdaStep('Echo', result_selector=RESULT_SELECTOR)
2532

2633
assert step.to_dict() == {
2734
'Type': 'Task',
2835
'Resource': 'arn:aws:states:::lambda:invoke',
36+
'ResultSelector': {'OutputA.$': "$['A']"},
2937
'End': True
3038
}
3139

@@ -51,11 +59,12 @@ def test_lambda_step_creation():
5159

5260
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
5361
def test_glue_start_job_run_step_creation():
54-
step = GlueStartJobRunStep('Glue Job', wait_for_completion=False)
62+
step = GlueStartJobRunStep('Glue Job', wait_for_completion=False, result_selector=RESULT_SELECTOR)
5563

5664
assert step.to_dict() == {
5765
'Type': 'Task',
5866
'Resource': 'arn:aws:states:::glue:startJobRun',
67+
'ResultSelector': {'OutputA.$': "$['A']"},
5968
'End': True
6069
}
6170

@@ -75,11 +84,12 @@ def test_glue_start_job_run_step_creation():
7584

7685
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
7786
def test_batch_submit_job_step_creation():
78-
step = BatchSubmitJobStep('Batch Job', wait_for_completion=False)
87+
step = BatchSubmitJobStep('Batch Job', wait_for_completion=False, result_selector=RESULT_SELECTOR)
7988

8089
assert step.to_dict() == {
8190
'Type': 'Task',
8291
'Resource': 'arn:aws:states:::batch:submitJob',
92+
'ResultSelector': {'OutputA.$': "$['A']"},
8393
'End': True
8494
}
8595

@@ -101,11 +111,12 @@ def test_batch_submit_job_step_creation():
101111

102112
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
103113
def test_ecs_run_task_step_creation():
104-
step = EcsRunTaskStep('Ecs Job', wait_for_completion=False)
114+
step = EcsRunTaskStep('Ecs Job', wait_for_completion=False, result_selector=RESULT_SELECTOR)
105115

106116
assert step.to_dict() == {
107117
'Type': 'Task',
108118
'Resource': 'arn:aws:states:::ecs:runTask',
119+
'ResultSelector': {'OutputA.$': "$['A']"},
109120
'End': True
110121
}
111122

0 commit comments

Comments
 (0)