Skip to content

feat: Support placeholders for input_path and output_path for all States (except Fail) and items_path for MapState #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions src/stepfunctions/steps/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,37 @@ def to_dict(self):
k = to_pascalcase(k)
if k == to_pascalcase(Field.Parameters.value):
result[k] = self._replace_placeholders(v)
elif self._is_placeholder_compatible(k):
if isinstance(v, Placeholder):
modified_key = f"{k}.$"
result[modified_key] = v.to_jsonpath()
else:
result[k] = v
else:
result[k] = v

return result

@staticmethod
def _is_placeholder_compatible(field):
"""
Check if the field is placeholder compatible

Args:
field: Field against which to verify placeholder compatibility
"""
return field in [
# Common fields
to_pascalcase(Field.Comment.value),
to_pascalcase(Field.InputPath.value),
to_pascalcase(Field.OutputPath.value),
to_pascalcase(Field.ResultPath.value),

# Map
to_pascalcase(Field.ItemsPath.value),
to_pascalcase(Field.MaxConcurrency.value),
]

def to_json(self, pretty=False):
"""Serialize to a JSON formatted string.

Expand Down Expand Up @@ -541,13 +567,13 @@ def __init__(self, state_id, **kwargs):
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
iterator (State or Chain): State or chain to execute for each of the items in `items_path`.
items_path (str, optional): Path in the input for items to iterate over. (default: '$')
max_concurrency (int, optional): Maximum number of iterations to have running at any given point in time. (default: 0)
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
items_path (str or Placeholder, optional): Path in the input for items to iterate over. (default: '$')
max_concurrency (int or Placeholder, optional): Maximum number of iterations to have running at any given point in time. (default: 0)
comment (str or Placeholder, optional): Human-readable comment or description. (default: None)
input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
result_path (str or Placeholder, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
super(Map, self).__init__(state_id, 'Map', **kwargs)

Expand Down
63 changes: 63 additions & 0 deletions tests/unit/test_placeholders_with_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,69 @@ def test_map_state_with_placeholders():
result = Graph(workflow_definition).to_dict()
assert result == expected_repr


def test_map_state_with_placeholders():
workflow_input = ExecutionInput(schema={
'comment': str,
'input_path': str,
'output_path': str,
'result_path': str,
'items_path': str,
'max_concurrency': int,
'ParamB': str
})

map_state = Map(
'MapState01',
comment=workflow_input['input_path'],
input_path=workflow_input['input_path'],
output_path=workflow_input['output_path'],
result_path=workflow_input['result_path'],
items_path=workflow_input['result_path'],
max_concurrency=workflow_input['max_concurrency']
)
iterator_state = Pass(
'TrainIterator',
parameters={
'ParamA': map_state.output()['X']["Y"],
'ParamB': workflow_input['ParamB']
})

map_state.attach_iterator(iterator_state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what happens with this? - does it become part of the workflow definition somewhere? I thought the Iterator and ItemsPath properties were required for Map states

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ItemsPath is optional. Similar to InputPath and OutputPath, the default is $.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iterator is added to the Map State here and will be added to the workflow definition when the Map state itself is added to the workflow definition

The docstring will need to be updated - it seems like iterator is required in the Map State constructor when in fact, it gets creates successfully without it. However, the workflow will fail to create if no iterator is set - we could add a validation here and raise an exception if the iterator is not set. At the moment, if fails here with the following message

ValueError: Expected branch to be a State or a Chain, but got `None`

This can be done in another PR to avoid making the scope larger

workflow_definition = Chain([map_state])

expected_repr = {
"StartAt": "MapState01",
"States": {
"MapState01": {
"Type": "Map",
"End": True,
"Comment.$": "$$.Execution.Input['input_path']",
"InputPath.$": "$$.Execution.Input['input_path']",
"ItemsPath.$": "$$.Execution.Input['result_path']",
"Iterator": {
"StartAt": "TrainIterator",
"States": {
"TrainIterator": {
"Parameters": {
"ParamA.$": "$['X']['Y']",
"ParamB.$": "$$.Execution.Input['ParamB']"
},
"Type": "Pass",
"End": True
}
}
},
"MaxConcurrency.$": "$$.Execution.Input['max_concurrency']",
"OutputPath.$": "$$.Execution.Input['output_path']",
"ResultPath.$": "$$.Execution.Input['result_path']",
}
}
}

result = Graph(workflow_definition).to_dict()
assert result == expected_repr

def test_parallel_state_with_placeholders():
workflow_input = ExecutionInput()

Expand Down