Skip to content

feat: Support placeholders for input_path and output_path for all States (except Fail) and items_path for MapState #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion doc/placeholders.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Once defined, a workflow is static unless you update it explicitly. But, you can
input to workflow executions. You can have dynamic values
that you use in the **parameters** fields of the steps in your workflow. For this,
the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you
create your workflow. There are 2 mechanisms for passing dynamic values in a workflow.
create your workflow. There are 4 mechanisms for passing dynamic values in a workflow.

The first mechanism is a global input to the workflow execution. This input is
accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput`
Expand Down Expand Up @@ -82,6 +82,62 @@ that returns the placeholder output for that step.
definition = Chain([lambda_state_first, lambda_state_second])


The third and fourth mechanisms can be used to access the context object from Map states.
The SDK provides the :py:meth:`stepfunctions.inputs.MapItemIndex` class that allows you to get the index number of the
array item that is being processed in the current iteration and the :py:meth:`stepfunctions.inputs.MapItemValue` class
which is used for accessing the value of the array item that is currently being processed.

.. code-block:: python

# Create an instance of MapItemValue class, and define a schema. Defining
# a schema is optional, but it is a good practice
map_item_value = MapItemValue(schema={
'name': str,
'points': str
})

map_state = Map(
'MapState',
parameters={
"Ranking": MapItemIndex(),
"Contestant": map_item_value['name'],
"Score": map_item_value['points']
}
)
iterator_state = Pass('TrainIterator')
map_state.attach_iterator(iterator_state)

# Workflow is created with the placeholders
workflow = Workflow(
name='MyMapWorkflow',
definition=map_state,
role=workflow_execution_role,
)

# Create the workflow on AWS Step Functions
workflow.create()

# The placeholder is assigned a value during execution. The SDK will
# verify that all placeholder values are assigned values, and that
# these values are of the expected type based on the defined schema
# before the execution starts.
workflow_input = execution_input = [{"name": "John", "points": "101"}, {"name": "Snow", "points": "99"}]
workflow.execute(inputs=workflow_input)

# The execution output will be:
[
{
"Ranking": 0,
"Contestant": "John",
"Score": "101",
},
{
"Ranking": 1,
"Contestant": "Snow",
"Score": "99"
}
]


.. autoclass:: stepfunctions.inputs.Placeholder

Expand All @@ -90,3 +146,9 @@ that returns the placeholder output for that step.

.. autoclass:: stepfunctions.inputs.StepInput
:inherited-members:

.. autoclass:: stepfunctions.inputs.MapItemValue
:inherited-members:

.. autoclass:: stepfunctions.inputs.MapItemIndex
:inherited-members:
2 changes: 1 addition & 1 deletion src/stepfunctions/inputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# permissions and limitations under the License.
from __future__ import absolute_import

from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput
from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, MapItemIndex, MapItemValue, StepInput
43 changes: 42 additions & 1 deletion src/stepfunctions/inputs/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class StepInput(Placeholder):
def __init__(self, schema=None, **kwargs):
super(StepInput, self).__init__(schema, **kwargs)
self.json_str_template = '${}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Step Input.
Expand All @@ -291,3 +291,44 @@ def _create_variable(self, name, parent, type=None):
return StepInput(name=name, parent=parent, type=type)
else:
return StepInput(name=name, parent=parent)


class MapItemValue(Placeholder):
"""
Top-level class for map item value placeholders.
"""

def __init__(self, schema=None, **kwargs):
super(MapItemValue, self).__init__(schema, **kwargs)
self.json_str_template = '$$.Map.Item.Value{}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Map Item Value.
A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema.
"""
if self.immutable:
raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection: "
f" {self.schema}")
if type:
return MapItemValue(name=name, parent=parent, type=type)
else:
return MapItemValue(name=name, parent=parent)


class MapItemIndex(Placeholder):
"""
Top-level class for map item index placeholders.
"""

def __init__(self, **kwargs):
if kwargs.get('schema'):
raise AttributeError("MapItemIndex does not support schema object")
super(MapItemIndex, self).__init__(**kwargs)
self.json_str_template = '$$.Map.Item.Index'

def _create_variable(self, name, parent, type=None):
raise AttributeError("MapItemIndex has no _create_variable object")

def __getitem__(self, item):
raise AttributeError("MapItemIndex has no __getitem__ object")
16 changes: 8 additions & 8 deletions src/stepfunctions/steps/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""

if wait_for_callback:
Expand Down Expand Up @@ -96,10 +96,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
"""
Expand Down Expand Up @@ -136,10 +136,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
"""
Expand Down Expand Up @@ -176,10 +176,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion:
"""
Expand Down
Loading