Skip to content

feat: Support placeholders for input_path and output_path for all States (except Fail) and items_path for MapState #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/stepfunctions/inputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# permissions and limitations under the License.
from __future__ import absolute_import

from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput
from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, MapItemIndex, MapItemValue, StepInput
49 changes: 48 additions & 1 deletion src/stepfunctions/inputs/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class StepInput(Placeholder):
def __init__(self, schema=None, **kwargs):
super(StepInput, self).__init__(schema, **kwargs)
self.json_str_template = '${}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Step Input.
Expand All @@ -291,3 +291,50 @@ def _create_variable(self, name, parent, type=None):
return StepInput(name=name, parent=parent, type=type)
else:
return StepInput(name=name, parent=parent)


class MapItemValue(Placeholder):
"""
Top-level class for map item value placeholders.
"""

def __init__(self, schema=None, **kwargs):
super(MapItemValue, self).__init__(schema, **kwargs)
self.json_str_template = '$$.Map.Item.Value{}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Map Item Value.
A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema.
"""
if self.immutable:
raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.")
if type:
return MapItemValue(name=name, parent=parent, type=type)
else:
return MapItemValue(name=name, parent=parent)


class MapItemIndex(Placeholder):
"""
Top-level class for map item index placeholders.
"""

def __init__(self, schema=None, **kwargs):
super(MapItemIndex, self).__init__(schema, **kwargs)
self.json_str_template = '$$.Map.Item.Index'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Map Item Index.
A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema.
"""
if self.immutable:
raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.")
if type:
return MapItemIndex(name=name, parent=parent, type=type)
else:
return MapItemIndex(name=name, parent=parent)

def __getitem__(self, item):
raise AttributeError("MapItemIndex has no __getitem__ object")
2 changes: 1 addition & 1 deletion src/stepfunctions/steps/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def __init__(self, state_id, **kwargs):
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
iterator (State or Chain): State or chain to execute for each of the items in `items_path`.
items_path (str, optional): Path in the input for items to iterate over. (default: '$')
items_path (str or Placeholder, optional): Path in the input for items to iterate over. (default: '$')
max_concurrency (int, optional): Maximum number of iterations to have running at any given point in time. (default: 0)
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
Expand Down
170 changes: 168 additions & 2 deletions tests/unit/test_placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest
import json

from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.inputs import ExecutionInput, MapItemIndex, MapItemValue, StepInput

def test_placeholder_creation_with_subscript_operator():
step_input = StepInput()
Expand Down Expand Up @@ -178,4 +178,170 @@ def check_immutable(placeholder):
for k, v in placeholder.store.items():
return check_immutable(v)
else:
return False
return False


def test_map_item_value_creation_with_subscript_operator():
map_item_placeholder = MapItemValue()
map_item_placeholder = map_item_placeholder["A"]
assert map_item_placeholder.name == "A"
assert map_item_placeholder.type is None


def test_map_item_index_creation_with_subscript_operator():
map_item_placeholder = MapItemIndex()
with pytest.raises(AttributeError):
map_item_placeholder["A"]


def test_map_item_value_creation_with_type():
map_item_placeholder = MapItemValue()
map_item_variable = map_item_placeholder["A"]["b"].get("C", float)
assert map_item_variable.name == "C"
assert map_item_variable.type == float


def test_map_item_value_creation_with_int_key():
map_item_placeholder = MapItemValue()
map_item_variable = map_item_placeholder["A"][0]
assert map_item_variable.name == 0
assert map_item_variable.type is None


def test_map_item_value_creation_with_invalid_key():
map_item_placeholder = MapItemValue()
with pytest.raises(ValueError):
map_item_placeholder["A"][1.3]
with pytest.raises(ValueError):
map_item_placeholder["A"].get(1.2, str)


def test_map_item_value_creation_failure_with_type():
map_item_placeholder = MapItemValue()
map_item_placeholder["A"]["b"].get("C", float)
with pytest.raises(ValueError):
map_item_placeholder["A"]["b"].get("C", int)


def test_map_item_value_path():
map_item_placeholder = MapItemValue()
placeholder_variable = map_item_placeholder["A"]["b"]["C"]
expected_path = ["A", "b", "C"]
assert placeholder_variable._get_path() == expected_path


def test_map_item_value_contains():
map_item_placeholder = MapItemValue()
var_three = map_item_placeholder["Key01"]["Key04"]

map_item_placeholder_two = StepInput()
var_five = map_item_placeholder_two["Key07"]

assert map_item_placeholder.contains(var_three) == True
assert map_item_placeholder.contains(var_five) == False
assert map_item_placeholder_two.contains(var_three) == False
assert map_item_placeholder_two.contains(var_five) == True


def test_map_item_value_schema_as_dict():
map_item_placeholder = MapItemValue()
map_item_placeholder["A"]["b"].get("C", float)
map_item_placeholder["Message"]
map_item_placeholder["Key01"]["Key02"]
map_item_placeholder["Key03"]
map_item_placeholder["Key03"]["Key04"]

expected_schema = {
"A": {
"b": {
"C": float
}
},
"Message": str,
"Key01": {
"Key02": str
},
"Key03": {
"Key04": str
}
}

assert map_item_placeholder.get_schema_as_dict() == expected_schema


def test_map_item_value_schema_as_json():
map_item_placeholder = MapItemValue()
map_item_placeholder["Response"].get("StatusCode", int)
map_item_placeholder["Hello"]["World"]
map_item_placeholder["A"]
map_item_placeholder["Hello"]["World"].get("Test", str)

expected_schema = {
"Response": {
"StatusCode": "int"
},
"Hello": {
"World": {
"Test": "str"
}
},
"A": "str"
}

assert map_item_placeholder.get_schema_as_json() == json.dumps(expected_schema)


def test_map_item_value_is_empty():
workflow_input = MapItemValue()
placeholder_variable = workflow_input["A"]["B"]["C"]
assert placeholder_variable._is_empty() == True
workflow_input["A"]["B"]["C"]["D"]
assert placeholder_variable._is_empty() == False


def test_map_item_value_make_immutable():
workflow_input = MapItemValue()
workflow_input["A"]["b"].get("C", float)
workflow_input["Message"]
workflow_input["Key01"]["Key02"]
workflow_input["Key03"]
workflow_input["Key03"]["Key04"]

assert check_immutable(workflow_input) == False

workflow_input._make_immutable()
assert check_immutable(workflow_input) == True


def test_map_item_value_with_schema():
test_schema = {
"A": {
"B": {
"C": int
}
},
"Request": {
"Status": str
},
"Hello": float
}
workflow_input = MapItemValue(schema=test_schema)
assert workflow_input.get_schema_as_dict() == test_schema
assert workflow_input.immutable == True

with pytest.raises(ValueError):
workflow_input["A"]["B"]["D"]

with pytest.raises(ValueError):
workflow_input["A"]["B"].get("C", float)


def test_map_item_value_jsonpath():
map_item_value = MapItemValue()
map_item_value_variable = map_item_value["A"]["b"].get(0, float)
assert map_item_value_variable.to_jsonpath() == "$$.Map.Item.Value['A']['b'][0]"


def test_map_item_index_jsonpath():
map_item_index = MapItemIndex()
assert map_item_index.to_jsonpath() == "$$.Map.Item.Index"
60 changes: 59 additions & 1 deletion tests/unit/test_placeholders_with_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest

from stepfunctions.steps import Pass, Succeed, Fail, Wait, Choice, ChoiceRule, Parallel, Map, Task, Retry, Catch, Chain, Graph
from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.inputs import ExecutionInput, MapItemValue, MapItemIndex

def test_workflow_input_placeholder():

Expand Down Expand Up @@ -214,6 +214,64 @@ def test_map_state_with_placeholders():
result = Graph(workflow_definition).to_dict()
assert result == expected_repr


def test_map_state_with_placeholders():
map_item_value = MapItemValue(schema={
'name': str,
'age': str
})

map_item_index = MapItemIndex()

map_state = Map(
'MapState01',
parameters={
"MapIndex": map_item_index,
"Name": map_item_value['name'],
"Age": map_item_value['age']
}
)
iterator_state = Pass(
'TrainIterator',
parameters={
'ParamA': map_state.output()['X']["Y"]
}
)

map_state.attach_iterator(iterator_state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what happens with this? - does it become part of the workflow definition somewhere? I thought the Iterator and ItemsPath properties were required for Map states

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ItemsPath is optional. Similar to InputPath and OutputPath, the default is $.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iterator is added to the Map State here and will be added to the workflow definition when the Map state itself is added to the workflow definition

The docstring will need to be updated - it seems like iterator is required in the Map State constructor when in fact, it gets creates successfully without it. However, the workflow will fail to create if no iterator is set - we could add a validation here and raise an exception if the iterator is not set. At the moment, if fails here with the following message

ValueError: Expected branch to be a State or a Chain, but got `None`

This can be done in another PR to avoid making the scope larger

workflow_definition = Chain([map_state])

expected_repr = {
"StartAt": "MapState01",
"States": {
"MapState01": {
"Type": "Map",
"End": True,
"Iterator": {
"StartAt": "TrainIterator",
"States": {
"TrainIterator": {
"Parameters": {
"ParamA.$": "$['X']['Y']"
},
"Type": "Pass",
"End": True
}
}
},
"Parameters": {
"Age.$": "$$.Map.Item.Value['age']",
"MapIndex.$": "$$.Map.Item.Index",
"Name.$": "$$.Map.Item.Value['name']"
},
}
}
}

result = Graph(workflow_definition).to_dict()
assert result == expected_repr


def test_parallel_state_with_placeholders():
workflow_input = ExecutionInput()

Expand Down