From f5800cb695bbf8670fcf040d7326ee74012a2179 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <carolngu@amazon.com> Date: Fri, 3 Sep 2021 12:21:35 -0700 Subject: [PATCH 1/9] Support placeholders for Map state --- src/stepfunctions/steps/states.py | 38 ++++++++++--- tests/unit/test_placeholders_with_steps.py | 63 ++++++++++++++++++++++ 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 9669a73..23cae65 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -73,11 +73,37 @@ def to_dict(self): k = to_pascalcase(k) if k == to_pascalcase(Field.Parameters.value): result[k] = self._replace_placeholders(v) + elif self._is_placeholder_compatible(k): + if isinstance(v, Placeholder): + modified_key = f"{k}.$" + result[modified_key] = v.to_jsonpath() + else: + result[k] = v else: result[k] = v return result + @staticmethod + def _is_placeholder_compatible(field): + """ + Check if the field is placeholder compatible + + Args: + field: Field against which to verify placeholder compatibility + """ + return field in [ + # Common fields + to_pascalcase(Field.Comment.value), + to_pascalcase(Field.InputPath.value), + to_pascalcase(Field.OutputPath.value), + to_pascalcase(Field.ResultPath.value), + + # Map + to_pascalcase(Field.ItemsPath.value), + to_pascalcase(Field.MaxConcurrency.value), + ] + def to_json(self, pretty=False): """Serialize to a JSON formatted string. @@ -541,13 +567,13 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. iterator (State or Chain): State or chain to execute for each of the items in `items_path`. - items_path (str, optional): Path in the input for items to iterate over. (default: '$') - max_concurrency (int, optional): Maximum number of iterations to have running at any given point in time. (default: 0) - comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + items_path (str or Placeholder, optional): Path in the input for items to iterate over. (default: '$') + max_concurrency (int or Placeholder, optional): Maximum number of iterations to have running at any given point in time. (default: 0) + comment (str or Placeholder, optional): Human-readable comment or description. (default: None) + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. - result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + result_path (str or Placeholder, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Map, self).__init__(state_id, 'Map', **kwargs) diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index 54d1543..205adb1 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -214,6 +214,69 @@ def test_map_state_with_placeholders(): result = Graph(workflow_definition).to_dict() assert result == expected_repr + +def test_map_state_with_placeholders(): + workflow_input = ExecutionInput(schema={ + 'comment': str, + 'input_path': str, + 'output_path': str, + 'result_path': str, + 'items_path': str, + 'max_concurrency': int, + 'ParamB': str + }) + + map_state = Map( + 'MapState01', + comment=workflow_input['input_path'], + input_path=workflow_input['input_path'], + output_path=workflow_input['output_path'], + result_path=workflow_input['result_path'], + items_path=workflow_input['result_path'], + max_concurrency=workflow_input['max_concurrency'] + ) + iterator_state = Pass( + 'TrainIterator', + parameters={ + 'ParamA': map_state.output()['X']["Y"], + 'ParamB': workflow_input['ParamB'] + }) + + map_state.attach_iterator(iterator_state) + workflow_definition = Chain([map_state]) + + expected_repr = { + "StartAt": "MapState01", + "States": { + "MapState01": { + "Type": "Map", + "End": True, + "Comment.$": "$$.Execution.Input['input_path']", + "InputPath.$": "$$.Execution.Input['input_path']", + "ItemsPath.$": "$$.Execution.Input['result_path']", + "Iterator": { + "StartAt": "TrainIterator", + "States": { + "TrainIterator": { + "Parameters": { + "ParamA.$": "$['X']['Y']", + "ParamB.$": "$$.Execution.Input['ParamB']" + }, + "Type": "Pass", + "End": True + } + } + }, + "MaxConcurrency.$": "$$.Execution.Input['max_concurrency']", + "OutputPath.$": "$$.Execution.Input['output_path']", + "ResultPath.$": "$$.Execution.Input['result_path']", + } + } + } + + result = Graph(workflow_definition).to_dict() + assert result == expected_repr + def test_parallel_state_with_placeholders(): workflow_input = ExecutionInput() From 788c148e5772bada8942aaedeb9a38a8a2d82331 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <carolngu@amazon.com> Date: Wed, 8 Sep 2021 10:06:53 -0700 Subject: [PATCH 2/9] Adding MapItemValue and MapItemIndex placeholders --- src/stepfunctions/inputs/__init__.py | 2 +- src/stepfunctions/inputs/placeholders.py | 49 +++++- src/stepfunctions/steps/states.py | 36 +---- tests/unit/test_placeholders.py | 170 ++++++++++++++++++++- tests/unit/test_placeholders_with_steps.py | 47 +++--- 5 files changed, 243 insertions(+), 61 deletions(-) diff --git a/src/stepfunctions/inputs/__init__.py b/src/stepfunctions/inputs/__init__.py index ffa01b0..81fe344 100644 --- a/src/stepfunctions/inputs/__init__.py +++ b/src/stepfunctions/inputs/__init__.py @@ -12,4 +12,4 @@ # permissions and limitations under the License. from __future__ import absolute_import -from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput +from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, MapItemIndex, MapItemValue, StepInput diff --git a/src/stepfunctions/inputs/placeholders.py b/src/stepfunctions/inputs/placeholders.py index 3b7f2b6..073a6cc 100644 --- a/src/stepfunctions/inputs/placeholders.py +++ b/src/stepfunctions/inputs/placeholders.py @@ -279,7 +279,7 @@ class StepInput(Placeholder): def __init__(self, schema=None, **kwargs): super(StepInput, self).__init__(schema, **kwargs) self.json_str_template = '${}' - + def _create_variable(self, name, parent, type=None): """ Creates a placeholder variable for Step Input. @@ -291,3 +291,50 @@ def _create_variable(self, name, parent, type=None): return StepInput(name=name, parent=parent, type=type) else: return StepInput(name=name, parent=parent) + + +class MapItemValue(Placeholder): + """ + Top-level class for map item value placeholders. + """ + + def __init__(self, schema=None, **kwargs): + super(MapItemValue, self).__init__(schema, **kwargs) + self.json_str_template = '$$.Map.Item.Value{}' + + def _create_variable(self, name, parent, type=None): + """ + Creates a placeholder variable for Map Item Value. + A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + """ + if self.immutable: + raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + if type: + return MapItemValue(name=name, parent=parent, type=type) + else: + return MapItemValue(name=name, parent=parent) + + +class MapItemIndex(Placeholder): + """ + Top-level class for map item index placeholders. + """ + + def __init__(self, schema=None, **kwargs): + super(MapItemIndex, self).__init__(schema, **kwargs) + self.json_str_template = '$$.Map.Item.Index' + + def _create_variable(self, name, parent, type=None): + """ + Creates a placeholder variable for Map Item Index. + A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + """ + if self.immutable: + raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + if type: + return MapItemIndex(name=name, parent=parent, type=type) + else: + return MapItemIndex(name=name, parent=parent) + + def __getitem__(self, item): + raise AttributeError("MapItemIndex has no __getitem__ object") diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 23cae65..0881d00 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -73,37 +73,11 @@ def to_dict(self): k = to_pascalcase(k) if k == to_pascalcase(Field.Parameters.value): result[k] = self._replace_placeholders(v) - elif self._is_placeholder_compatible(k): - if isinstance(v, Placeholder): - modified_key = f"{k}.$" - result[modified_key] = v.to_jsonpath() - else: - result[k] = v else: result[k] = v return result - @staticmethod - def _is_placeholder_compatible(field): - """ - Check if the field is placeholder compatible - - Args: - field: Field against which to verify placeholder compatibility - """ - return field in [ - # Common fields - to_pascalcase(Field.Comment.value), - to_pascalcase(Field.InputPath.value), - to_pascalcase(Field.OutputPath.value), - to_pascalcase(Field.ResultPath.value), - - # Map - to_pascalcase(Field.ItemsPath.value), - to_pascalcase(Field.MaxConcurrency.value), - ] - def to_json(self, pretty=False): """Serialize to a JSON formatted string. @@ -568,12 +542,12 @@ def __init__(self, state_id, **kwargs): state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. iterator (State or Chain): State or chain to execute for each of the items in `items_path`. items_path (str or Placeholder, optional): Path in the input for items to iterate over. (default: '$') - max_concurrency (int or Placeholder, optional): Maximum number of iterations to have running at any given point in time. (default: 0) - comment (str or Placeholder, optional): Human-readable comment or description. (default: None) - input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + max_concurrency (int, optional): Maximum number of iterations to have running at any given point in time. (default: 0) + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. - result_path (str or Placeholder, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Map, self).__init__(state_id, 'Map', **kwargs) diff --git a/tests/unit/test_placeholders.py b/tests/unit/test_placeholders.py index 456a7bf..2c8b752 100644 --- a/tests/unit/test_placeholders.py +++ b/tests/unit/test_placeholders.py @@ -15,7 +15,7 @@ import pytest import json -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import ExecutionInput, MapItemIndex, MapItemValue, StepInput def test_placeholder_creation_with_subscript_operator(): step_input = StepInput() @@ -178,4 +178,170 @@ def check_immutable(placeholder): for k, v in placeholder.store.items(): return check_immutable(v) else: - return False \ No newline at end of file + return False + + +def test_map_item_value_creation_with_subscript_operator(): + map_item_placeholder = MapItemValue() + map_item_placeholder = map_item_placeholder["A"] + assert map_item_placeholder.name == "A" + assert map_item_placeholder.type is None + + +def test_map_item_index_creation_with_subscript_operator(): + map_item_placeholder = MapItemIndex() + with pytest.raises(AttributeError): + map_item_placeholder["A"] + + +def test_map_item_value_creation_with_type(): + map_item_placeholder = MapItemValue() + map_item_variable = map_item_placeholder["A"]["b"].get("C", float) + assert map_item_variable.name == "C" + assert map_item_variable.type == float + + +def test_map_item_value_creation_with_int_key(): + map_item_placeholder = MapItemValue() + map_item_variable = map_item_placeholder["A"][0] + assert map_item_variable.name == 0 + assert map_item_variable.type is None + + +def test_map_item_value_creation_with_invalid_key(): + map_item_placeholder = MapItemValue() + with pytest.raises(ValueError): + map_item_placeholder["A"][1.3] + with pytest.raises(ValueError): + map_item_placeholder["A"].get(1.2, str) + + +def test_map_item_value_creation_failure_with_type(): + map_item_placeholder = MapItemValue() + map_item_placeholder["A"]["b"].get("C", float) + with pytest.raises(ValueError): + map_item_placeholder["A"]["b"].get("C", int) + + +def test_map_item_value_path(): + map_item_placeholder = MapItemValue() + placeholder_variable = map_item_placeholder["A"]["b"]["C"] + expected_path = ["A", "b", "C"] + assert placeholder_variable._get_path() == expected_path + + +def test_map_item_value_contains(): + map_item_placeholder = MapItemValue() + var_three = map_item_placeholder["Key01"]["Key04"] + + map_item_placeholder_two = StepInput() + var_five = map_item_placeholder_two["Key07"] + + assert map_item_placeholder.contains(var_three) == True + assert map_item_placeholder.contains(var_five) == False + assert map_item_placeholder_two.contains(var_three) == False + assert map_item_placeholder_two.contains(var_five) == True + + +def test_map_item_value_schema_as_dict(): + map_item_placeholder = MapItemValue() + map_item_placeholder["A"]["b"].get("C", float) + map_item_placeholder["Message"] + map_item_placeholder["Key01"]["Key02"] + map_item_placeholder["Key03"] + map_item_placeholder["Key03"]["Key04"] + + expected_schema = { + "A": { + "b": { + "C": float + } + }, + "Message": str, + "Key01": { + "Key02": str + }, + "Key03": { + "Key04": str + } + } + + assert map_item_placeholder.get_schema_as_dict() == expected_schema + + +def test_map_item_value_schema_as_json(): + map_item_placeholder = MapItemValue() + map_item_placeholder["Response"].get("StatusCode", int) + map_item_placeholder["Hello"]["World"] + map_item_placeholder["A"] + map_item_placeholder["Hello"]["World"].get("Test", str) + + expected_schema = { + "Response": { + "StatusCode": "int" + }, + "Hello": { + "World": { + "Test": "str" + } + }, + "A": "str" + } + + assert map_item_placeholder.get_schema_as_json() == json.dumps(expected_schema) + + +def test_map_item_value_is_empty(): + workflow_input = MapItemValue() + placeholder_variable = workflow_input["A"]["B"]["C"] + assert placeholder_variable._is_empty() == True + workflow_input["A"]["B"]["C"]["D"] + assert placeholder_variable._is_empty() == False + + +def test_map_item_value_make_immutable(): + workflow_input = MapItemValue() + workflow_input["A"]["b"].get("C", float) + workflow_input["Message"] + workflow_input["Key01"]["Key02"] + workflow_input["Key03"] + workflow_input["Key03"]["Key04"] + + assert check_immutable(workflow_input) == False + + workflow_input._make_immutable() + assert check_immutable(workflow_input) == True + + +def test_map_item_value_with_schema(): + test_schema = { + "A": { + "B": { + "C": int + } + }, + "Request": { + "Status": str + }, + "Hello": float + } + workflow_input = MapItemValue(schema=test_schema) + assert workflow_input.get_schema_as_dict() == test_schema + assert workflow_input.immutable == True + + with pytest.raises(ValueError): + workflow_input["A"]["B"]["D"] + + with pytest.raises(ValueError): + workflow_input["A"]["B"].get("C", float) + + +def test_map_item_value_jsonpath(): + map_item_value = MapItemValue() + map_item_value_variable = map_item_value["A"]["b"].get(0, float) + assert map_item_value_variable.to_jsonpath() == "$$.Map.Item.Value['A']['b'][0]" + + +def test_map_item_index_jsonpath(): + map_item_index = MapItemIndex() + assert map_item_index.to_jsonpath() == "$$.Map.Item.Index" diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index 205adb1..6e83989 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -15,7 +15,7 @@ import pytest from stepfunctions.steps import Pass, Succeed, Fail, Wait, Choice, ChoiceRule, Parallel, Map, Task, Retry, Catch, Chain, Graph -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import ExecutionInput, MapItemValue, MapItemIndex def test_workflow_input_placeholder(): @@ -216,31 +216,27 @@ def test_map_state_with_placeholders(): def test_map_state_with_placeholders(): - workflow_input = ExecutionInput(schema={ - 'comment': str, - 'input_path': str, - 'output_path': str, - 'result_path': str, - 'items_path': str, - 'max_concurrency': int, - 'ParamB': str + map_item_value = MapItemValue(schema={ + 'name': str, + 'age': str }) + map_item_index = MapItemIndex() + map_state = Map( 'MapState01', - comment=workflow_input['input_path'], - input_path=workflow_input['input_path'], - output_path=workflow_input['output_path'], - result_path=workflow_input['result_path'], - items_path=workflow_input['result_path'], - max_concurrency=workflow_input['max_concurrency'] + parameters={ + "MapIndex": map_item_index, + "Name": map_item_value['name'], + "Age": map_item_value['age'] + } ) iterator_state = Pass( 'TrainIterator', parameters={ - 'ParamA': map_state.output()['X']["Y"], - 'ParamB': workflow_input['ParamB'] - }) + 'ParamA': map_state.output()['X']["Y"] + } + ) map_state.attach_iterator(iterator_state) workflow_definition = Chain([map_state]) @@ -251,25 +247,23 @@ def test_map_state_with_placeholders(): "MapState01": { "Type": "Map", "End": True, - "Comment.$": "$$.Execution.Input['input_path']", - "InputPath.$": "$$.Execution.Input['input_path']", - "ItemsPath.$": "$$.Execution.Input['result_path']", "Iterator": { "StartAt": "TrainIterator", "States": { "TrainIterator": { "Parameters": { - "ParamA.$": "$['X']['Y']", - "ParamB.$": "$$.Execution.Input['ParamB']" + "ParamA.$": "$['X']['Y']" }, "Type": "Pass", "End": True } } }, - "MaxConcurrency.$": "$$.Execution.Input['max_concurrency']", - "OutputPath.$": "$$.Execution.Input['output_path']", - "ResultPath.$": "$$.Execution.Input['result_path']", + "Parameters": { + "Age.$": "$$.Map.Item.Value['age']", + "MapIndex.$": "$$.Map.Item.Index", + "Name.$": "$$.Map.Item.Value['name']" + }, } } } @@ -277,6 +271,7 @@ def test_map_state_with_placeholders(): result = Graph(workflow_definition).to_dict() assert result == expected_repr + def test_parallel_state_with_placeholders(): workflow_input = ExecutionInput() From d20a883436990f5d8de0a90219687231ede91ae8 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <carolngu@amazon.com> Date: Fri, 10 Sep 2021 02:04:19 -0700 Subject: [PATCH 3/9] Applied changes per review --- src/stepfunctions/inputs/placeholders.py | 6 ++++-- src/stepfunctions/steps/states.py | 2 +- tests/unit/test_placeholders.py | 1 + tests/unit/test_placeholders_with_steps.py | 4 +--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/stepfunctions/inputs/placeholders.py b/src/stepfunctions/inputs/placeholders.py index 073a6cc..23cc461 100644 --- a/src/stepfunctions/inputs/placeholders.py +++ b/src/stepfunctions/inputs/placeholders.py @@ -308,7 +308,8 @@ def _create_variable(self, name, parent, type=None): A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. """ if self.immutable: - raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection: " + f" {self.schema}") if type: return MapItemValue(name=name, parent=parent, type=type) else: @@ -330,7 +331,8 @@ def _create_variable(self, name, parent, type=None): A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. """ if self.immutable: - raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection: " + f" {self.schema}") if type: return MapItemIndex(name=name, parent=parent, type=type) else: diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 0881d00..9669a73 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -541,7 +541,7 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. iterator (State or Chain): State or chain to execute for each of the items in `items_path`. - items_path (str or Placeholder, optional): Path in the input for items to iterate over. (default: '$') + items_path (str, optional): Path in the input for items to iterate over. (default: '$') max_concurrency (int, optional): Maximum number of iterations to have running at any given point in time. (default: 0) comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') diff --git a/tests/unit/test_placeholders.py b/tests/unit/test_placeholders.py index 2c8b752..69ec40c 100644 --- a/tests/unit/test_placeholders.py +++ b/tests/unit/test_placeholders.py @@ -328,6 +328,7 @@ def test_map_item_value_with_schema(): workflow_input = MapItemValue(schema=test_schema) assert workflow_input.get_schema_as_dict() == test_schema assert workflow_input.immutable == True + assert workflow_input['A']['B'].get("C", int) with pytest.raises(ValueError): workflow_input["A"]["B"]["D"] diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index 6e83989..1013e57 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -221,12 +221,10 @@ def test_map_state_with_placeholders(): 'age': str }) - map_item_index = MapItemIndex() - map_state = Map( 'MapState01', parameters={ - "MapIndex": map_item_index, + "MapIndex": MapItemIndex(), "Name": map_item_value['name'], "Age": map_item_value['age'] } From 1c397dfe6c3e4a3e90506c1aa5871221170cf0db Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <carolngu@amazon.com> Date: Fri, 10 Sep 2021 02:52:29 -0700 Subject: [PATCH 4/9] Removed schema from MapItemIndex --- src/stepfunctions/inputs/placeholders.py | 18 +++++------------- tests/unit/test_placeholders.py | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/stepfunctions/inputs/placeholders.py b/src/stepfunctions/inputs/placeholders.py index 23cc461..ed81251 100644 --- a/src/stepfunctions/inputs/placeholders.py +++ b/src/stepfunctions/inputs/placeholders.py @@ -321,22 +321,14 @@ class MapItemIndex(Placeholder): Top-level class for map item index placeholders. """ - def __init__(self, schema=None, **kwargs): - super(MapItemIndex, self).__init__(schema, **kwargs) + def __init__(self, **kwargs): + if kwargs.get('schema'): + raise AttributeError("MapItemIndex does not support schema object") + super(MapItemIndex, self).__init__(**kwargs) self.json_str_template = '$$.Map.Item.Index' def _create_variable(self, name, parent, type=None): - """ - Creates a placeholder variable for Map Item Index. - A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. - """ - if self.immutable: - raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection: " - f" {self.schema}") - if type: - return MapItemIndex(name=name, parent=parent, type=type) - else: - return MapItemIndex(name=name, parent=parent) + raise AttributeError("MapItemIndex has no _create_variable object") def __getitem__(self, item): raise AttributeError("MapItemIndex has no __getitem__ object") diff --git a/tests/unit/test_placeholders.py b/tests/unit/test_placeholders.py index 69ec40c..4f543da 100644 --- a/tests/unit/test_placeholders.py +++ b/tests/unit/test_placeholders.py @@ -192,6 +192,8 @@ def test_map_item_index_creation_with_subscript_operator(): map_item_placeholder = MapItemIndex() with pytest.raises(AttributeError): map_item_placeholder["A"] + assert not map_item_placeholder.get_schema_as_dict() + assert not map_item_placeholder.immutable def test_map_item_value_creation_with_type(): @@ -337,6 +339,22 @@ def test_map_item_value_with_schema(): workflow_input["A"]["B"].get("C", float) +def test_map_item_index_with_schema(): + test_schema = { + "A": { + "B": { + "C": int + } + }, + "Request": { + "Status": str + }, + "Hello": float + } + with pytest.raises(AttributeError): + workflow_input = MapItemIndex(schema=test_schema) + + def test_map_item_value_jsonpath(): map_item_value = MapItemValue() map_item_value_variable = map_item_value["A"]["b"].get(0, float) From ca42a7a8224cbbbb983932f9b71c6f59d71844f0 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <carolngu@amazon.com> Date: Fri, 10 Sep 2021 10:09:33 -0700 Subject: [PATCH 5/9] Support placeholders for inputpath, outputpath and itemspath --- src/stepfunctions/steps/states.py | 28 +++++++++- tests/unit/test_placeholders_with_steps.py | 65 ++++++---------------- 2 files changed, 41 insertions(+), 52 deletions(-) diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 9669a73..d468e5c 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -73,11 +73,33 @@ def to_dict(self): k = to_pascalcase(k) if k == to_pascalcase(Field.Parameters.value): result[k] = self._replace_placeholders(v) + elif self._is_placeholder_compatible(k): + if isinstance(v, Placeholder): + result[k] = v.to_jsonpath() + else: + result[k] = v else: result[k] = v return result + @staticmethod + def _is_placeholder_compatible(field): + """ + Check if the field is placeholder compatible + Args: + field: Field against which to verify placeholder compatibility + """ + return field in [ + # Common fields + to_pascalcase(Field.InputPath.value), + to_pascalcase(Field.OutputPath.value), + + # Map + to_pascalcase(Field.ItemsPath.value) + ] + + def to_json(self, pretty=False): """Serialize to a JSON formatted string. @@ -541,13 +563,13 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. iterator (State or Chain): State or chain to execute for each of the items in `items_path`. - items_path (str, optional): Path in the input for items to iterate over. (default: '$') + items_path (str or Placeholder, optional): Path in the input for items to iterate over. (default: '$') max_concurrency (int, optional): Maximum number of iterations to have running at any given point in time. (default: 0) comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Map, self).__init__(state_id, 'Map', **kwargs) diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index 1013e57..6a7ef24 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -176,46 +176,6 @@ def test_step_input_order_validation(): def test_map_state_with_placeholders(): workflow_input = ExecutionInput() - - map_state = Map('MapState01') - iterator_state = Pass( - 'TrainIterator', - parameters={ - 'ParamA': map_state.output()['X']["Y"], - 'ParamB': workflow_input["Key01"]["Key02"]["Key03"] - }) - - map_state.attach_iterator(iterator_state) - workflow_definition = Chain([map_state]) - - expected_repr = { - "StartAt": "MapState01", - "States": { - "MapState01": { - "Type": "Map", - "End": True, - "Iterator": { - "StartAt": "TrainIterator", - "States": { - "TrainIterator": { - "Parameters": { - "ParamA.$": "$['X']['Y']", - "ParamB.$": "$$.Execution.Input['Key01']['Key02']['Key03']" - }, - "Type": "Pass", - "End": True - } - } - } - } - } - } - - result = Graph(workflow_definition).to_dict() - assert result == expected_repr - - -def test_map_state_with_placeholders(): map_item_value = MapItemValue(schema={ 'name': str, 'age': str @@ -223,6 +183,9 @@ def test_map_state_with_placeholders(): map_state = Map( 'MapState01', + input_path=workflow_input['input_path'], + items_path=workflow_input['items_path'], + output_path=workflow_input['output_path'], parameters={ "MapIndex": MapItemIndex(), "Name": map_item_value['name'], @@ -232,9 +195,9 @@ def test_map_state_with_placeholders(): iterator_state = Pass( 'TrainIterator', parameters={ - 'ParamA': map_state.output()['X']["Y"] - } - ) + 'ParamA': map_state.output()['X']["Y"], + 'ParamB': workflow_input["Key01"]["Key02"]["Key03"] + }) map_state.attach_iterator(iterator_state) workflow_definition = Chain([map_state]) @@ -245,23 +208,27 @@ def test_map_state_with_placeholders(): "MapState01": { "Type": "Map", "End": True, + "InputPath": "$$.Execution.Input['input_path']", + "ItemsPath": "$$.Execution.Input['items_path']", + "OutputPath": "$$.Execution.Input['output_path']", "Iterator": { "StartAt": "TrainIterator", "States": { "TrainIterator": { "Parameters": { - "ParamA.$": "$['X']['Y']" + "ParamA.$": "$['X']['Y']", + "ParamB.$": "$$.Execution.Input['Key01']['Key02']['Key03']" }, "Type": "Pass", "End": True } } }, - "Parameters": { - "Age.$": "$$.Map.Item.Value['age']", - "MapIndex.$": "$$.Map.Item.Index", - "Name.$": "$$.Map.Item.Value['name']" - }, + 'Parameters': { + 'Age.$': "$$.Map.Item.Value['age']", + 'MapIndex.$': '$$.Map.Item.Index', + 'Name.$': "$$.Map.Item.Value['name']" + } } } } From 0e81c293e3645fb40208ab10cb41ce0c2b210a53 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <carolngu@amazon.com> Date: Sat, 11 Sep 2021 00:55:51 -0700 Subject: [PATCH 6/9] Added MapItemIndex and MapItemValue to Placeholder doc and updated docstring and tests for all states that support input_path and output_path --- doc/placeholders.rst | 91 +++++++++++++++++- src/stepfunctions/steps/compute.py | 16 ++-- src/stepfunctions/steps/service.py | 92 +++++++++---------- src/stepfunctions/steps/states.py | 28 +++--- tests/integ/test_state_machine_definition.py | 75 +++++++++++++++ .../unit/test_path_placeholders_with_steps.py | 91 ++++++++++++++++++ 6 files changed, 324 insertions(+), 69 deletions(-) create mode 100644 tests/unit/test_path_placeholders_with_steps.py diff --git a/doc/placeholders.rst b/doc/placeholders.rst index fa87b1d..2fee2fc 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -5,7 +5,7 @@ Once defined, a workflow is static unless you update it explicitly. But, you can input to workflow executions. You can have dynamic values that you use in the **parameters** fields of the steps in your workflow. For this, the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you -create your workflow. There are 2 mechanisms for passing dynamic values in a workflow. +create your workflow. There are 4 mechanisms for passing dynamic values in a workflow. The first mechanism is a global input to the workflow execution. This input is accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput` @@ -82,6 +82,89 @@ that returns the placeholder output for that step. definition = Chain([lambda_state_first, lambda_state_second]) +The third and fourth mechanisms can be used to access context objects from the Map state. +The SDK provides the :py:meth:`stepfunctions.inputs.MapItemIndex` class that allows you to get the index number of the +array item that is being processed in the current iteration and the :py:meth:`stepfunctions.inputs.MapItemValue` class +which is used for accessing the value of the array item that is currently being processed. + +.. code-block:: python + + # Create an instance of MapItemValue class, and define a schema. Defining + # a schema is optional, but it is a good practice + map_item_value = MapItemValue(schema={ + 'name': str, + 'age': str + }) + + map_state = Map( + 'MapState', + parameters={ + "MapIndex": MapItemIndex(), + "Name": map_item_value['name'], + "Age": map_item_value['age'] + } + ) + iterator_state = Pass('TrainIterator') + map_state.attach_iterator(iterator_state) + + # Workflow is created with the placeholders + workflow = Workflow( + name='MyMapWorkflow', + definition=map_state, + role=workflow_execution_role, + ) + + # Create the workflow on AWS Step Functions + workflow.create() + + # This creates a workflow with the following definition: + """ + { + "StartAt": "MapState01", + "States": { + "MapState": { + "Parameters": { + "MapIndex.$": "$$.Map.Item.Index", + "Name.$": "$$.Map.Item.Value['name']", + "Age.$": "$$.Map.Item.Value['age']" + }, + "Type": "Map", + "End": true, + "Iterator": { + "StartAt": "TrainIterator", + "States": { + "TrainIterator": { + "Type": "Pass", + "End": true + } + } + } + } + } + } + """ + + # The placeholder is assigned a value during execution. The SDK will + # verify that all placeholder values are assigned values, and that + # these values are of the expected type based on the defined schema + # before the execution starts. + workflow_input = execution_input = [{"name": "John", "age": 21}, {"name": "Snow", "age": 18}] + workflow.execute(inputs=workflow_input) + + # The execution output will be: + [ + { + "MapIndex": 0, + "Age": 21, + "Name": "John" + }, + { + "MapIndex": 1, + "Age": 18, + "Name": "Snow" + } + ] + .. autoclass:: stepfunctions.inputs.Placeholder @@ -90,3 +173,9 @@ that returns the placeholder output for that step. .. autoclass:: stepfunctions.inputs.StepInput :inherited-members: + +.. autoclass:: stepfunctions.inputs.MapItemValue + :inherited-members: + +.. autoclass:: stepfunctions.inputs.MapItemIndex + :inherited-members: diff --git a/src/stepfunctions/steps/compute.py b/src/stepfunctions/steps/compute.py index 203ed47..6cf6f85 100644 --- a/src/stepfunctions/steps/compute.py +++ b/src/stepfunctions/steps/compute.py @@ -55,10 +55,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_callback: @@ -96,10 +96,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_completion: """ @@ -136,10 +136,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_completion: """ @@ -176,10 +176,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_completion: """ diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index a0c3950..7cfb124 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -84,10 +84,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -114,10 +114,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_callback: @@ -154,10 +154,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -184,10 +184,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -214,10 +214,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -243,10 +243,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -282,10 +282,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -321,10 +321,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -360,10 +360,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -399,10 +399,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -438,10 +438,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -477,10 +477,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -516,10 +516,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -547,10 +547,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -588,10 +588,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_callback: """ @@ -628,10 +628,10 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ if wait_for_callback: """ @@ -666,10 +666,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -705,10 +705,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -744,10 +744,10 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ if wait_for_completion: @@ -783,10 +783,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -813,10 +813,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -843,10 +843,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ @@ -873,10 +873,10 @@ def __init__(self, state_id, **kwargs): timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ """ diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index d468e5c..7524e80 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -191,10 +191,10 @@ def __init__(self, state_id, state_type, output_schema=None, **kwargs): state_type (str): Type of the state. (Allowed values: `'Pass'`, `'Succeed'`, `'Fail'`, `'Wait'`, `'Task'`, `'Choice'`, `'Parallel'`, `'Map'`). output_schema (dict): Expected output schema for the State. This is used to validate placeholder inputs used by the next state in the state machine. (default: None) comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(State, self).__init__(**kwargs) self.fields['type'] = state_type @@ -326,11 +326,11 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') result (str, optional): If present, its value is treated as the output of a virtual task, and placed as prescribed by the `result_path` field, if any, to be passed on to the next state. If `result` is not provided, the output is the input. - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Pass, self).__init__(state_id, 'Pass', **kwargs) @@ -357,8 +357,8 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') - output_path (str, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Succeed, self).__init__(state_id, 'Succeed', **kwargs) @@ -411,8 +411,8 @@ def __init__(self, state_id, **kwargs): timestamp (str): Absolute expiry time, specified as an ISO-8601 extended offset date-time format string. timestamp_path (str): Path applied to the state's input to select the timestamp to be used for wait duration. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') - output_path (str, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Wait, self).__init__(state_id, 'Wait', **kwargs) if len([v for v in (self.seconds, self.timestamp, self.timestamp_path, self.seconds_path) if v is not None]) != 1: @@ -441,8 +441,8 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') - output_path (str, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Choice, self).__init__(state_id, 'Choice', **kwargs) self.choices = [] @@ -514,10 +514,10 @@ def __init__(self, state_id, **kwargs): Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Parallel, self).__init__(state_id, 'Parallel', **kwargs) self.branches = [] @@ -618,10 +618,10 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) - input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + input_path (str or Placeholder, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') - output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + output_path (str or Placeholder, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ super(Task, self).__init__(state_id, 'Task', **kwargs) if self.timeout_seconds is not None and self.timeout_seconds_path is not None: diff --git a/tests/integ/test_state_machine_definition.py b/tests/integ/test_state_machine_definition.py index d21e59b..ecb6bb1 100644 --- a/tests/integ/test_state_machine_definition.py +++ b/tests/integ/test_state_machine_definition.py @@ -18,6 +18,7 @@ from sagemaker.utils import unique_name_from_base from sagemaker.image_uris import retrieve from stepfunctions import steps +from stepfunctions.inputs import ExecutionInput, MapItemIndex, MapItemValue from stepfunctions.workflow import Workflow from stepfunctions.steps.utils import get_aws_partition from tests.integ.utils import state_machine_delete_wait @@ -288,6 +289,80 @@ def test_map_state_machine_creation(sfn_client, sfn_role_arn): workflow_test_suite(sfn_client, workflow, asl_state_machine_definition, map_state_result, state_machine_input) +def test_map_state_machine_creation_with_placeholders(sfn_client, sfn_role_arn): + map_item_value = MapItemValue(schema={ + 'name': str, + 'age': str + }) + + execution_input = ExecutionInput() + + map_state_name = "Map State" + iterated_state_name = "Pass State" + final_state_name = "Final State" + max_concurrency = 0 + map_state_result = "Map Result" + state_machine_input = { + "items_path": [{"name": "John", "age": 21}, {"name": "Snow", "age": 18}] + } + + asl_state_machine_definition = { + "StartAt": map_state_name, + "States": { + map_state_name: { + "ItemsPath": "$$.Execution.Input['items_path']", + "Iterator": { + "StartAt": iterated_state_name, + "States": { + iterated_state_name: { + "Type": "Pass", + "End": True + } + } + }, + "MaxConcurrency": max_concurrency, + "Type": "Map", + "Next": final_state_name, + "Parameters": { + "Age.$": "$$.Map.Item.Value['age']", + "MapIndex.$": "$$.Map.Item.Index", + "Name.$": "$$.Map.Item.Value['name']" + }, + }, + final_state_name: { + "Type": "Pass", + "Result": map_state_result, + "End": True + } + } + } + + map_state = steps.Map( + map_state_name, + items_path=execution_input['items_path'], + iterator=steps.Pass(iterated_state_name), + max_concurrency=max_concurrency, + parameters={ + "MapIndex": MapItemIndex(), + "Name": map_item_value['name'], + "Age": map_item_value['age'] + } + ) + + definition = steps.Chain([ + map_state, + steps.Pass(final_state_name, result=map_state_result) + ]) + + workflow = Workflow( + unique_name_from_base('Test_Map_Workflow_With_Placeholders'), + definition=definition, + role=sfn_role_arn + ) + + workflow_test_suite(sfn_client, workflow, asl_state_machine_definition, map_state_result, state_machine_input) + + def test_choice_state_machine_creation(sfn_client, sfn_role_arn): choice_state_name = "ChoiceState" first_match_name = "FirstMatchState" diff --git a/tests/unit/test_path_placeholders_with_steps.py b/tests/unit/test_path_placeholders_with_steps.py new file mode 100644 index 0000000..4e18655 --- /dev/null +++ b/tests/unit/test_path_placeholders_with_steps.py @@ -0,0 +1,91 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. +from __future__ import absolute_import + +import boto3 +import pytest +from unittest.mock import patch + +from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.steps import Pass, Succeed, Wait, Choice, Parallel, Task, Map +from stepfunctions.steps.compute import LambdaStep, GlueStartJobRunStep, BatchSubmitJobStep, EcsRunTaskStep +from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep,\ + DynamoDBDeleteItemStep +from stepfunctions.steps.service import ( + EksCallStep, + EksCreateClusterStep, + EksCreateFargateProfileStep, + EksCreateNodeGroupStep, + EksDeleteClusterStep, + EksDeleteFargateProfileStep, + EksDeleteNodegroupStep, + EksRunJobStep, +) +from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep,\ + EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep,\ + EmrModifyInstanceGroupByNameStep +from stepfunctions.steps.service import EventBridgePutEventsStep +from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep +from stepfunctions.steps.service import GlueDataBrewStartJobRunStep +from stepfunctions.steps.states import State + +@pytest.mark.parametrize("state, state_id, extra_args", [ + # States + (State, "State", {'state_type': 'Void'}), + (Pass, "PassState", {}), + (Choice, "ChoiceState", {}), + (Succeed, "SucceedState", {}), + (Parallel, "ParallelState", {}), + (Task, "TaskState", {}), + (Wait, "WaitState", {'seconds': 10}), + (Map, "MapState", {'iterator': Pass('PassState')}), + + # Compute steps + (LambdaStep, "LambdaStep", {}), + (EcsRunTaskStep, "EcsRunTaskStep", {}), + (BatchSubmitJobStep, "BatchSubmitJobStep", {}), + (GlueStartJobRunStep, "GlueStartJobRunStep", {}), + + # Service steps + (DynamoDBGetItemStep, "DynamoDBGetItemStep", {}), + (DynamoDBPutItemStep, "DynamoDBPutItemStep", {}), + (DynamoDBUpdateItemStep, "DynamoDBUpdateItemStep", {}), + (DynamoDBDeleteItemStep, "DynamoDBDeleteItemStep", {}), + (EksCallStep, "EksCallStep", {}), + (EksCreateClusterStep, "EksCreateClusterStep", {}), + (EksCreateFargateProfileStep, "EksCreateFargateProfileStep", {}), + (EksCreateNodeGroupStep, "EksCreateNodeGroupStep", {}), + (EksDeleteClusterStep, "EksDeleteClusterStep", {}), + (EksDeleteFargateProfileStep, "EksDeleteFargateProfileStep", {}), + (EksDeleteNodegroupStep, "EksDeleteNodegroupStep", {}), + (EksRunJobStep, "EksRunJobStep", {}), + (EmrCreateClusterStep, "EmrCreateClusterStep", {}), + (EmrTerminateClusterStep, "EmrTerminateClusterStep", {}), + (EmrAddStepStep, "EmrAddStepStep", {}), + (EmrCancelStepStep, "EmrCancelStepStep", {}), + (EmrSetClusterTerminationProtectionStep, "EmrSetClusterTerminationProtectionStep", {}), + (EmrModifyInstanceFleetByNameStep, "EmrModifyInstanceFleetByNameStep", {}), + (EmrModifyInstanceGroupByNameStep, "EmrModifyInstanceGroupByNameStep", {}), + (EventBridgePutEventsStep, "EventBridgePutEventsStep", {}), + (SnsPublishStep, "SnsPublishStep", {}), + (SqsSendMessageStep, "SqsSendMessageStep", {}), + (GlueDataBrewStartJobRunStep, "GlueDataBrewStartJobRunStep", {}) +]) +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_service_step_creation_with_placeholders(state, state_id, extra_args): + execution_input = ExecutionInput(schema={'input_path': str}) + step_input = StepInput(schema={'output_path': str}) + step = state(state_id, input_path=execution_input['input_path'], output_path=step_input['output_path'], **extra_args) + + assert step.to_dict()['InputPath'] == "$$.Execution.Input['input_path']" + assert step.to_dict()['OutputPath'] == "$['output_path']" From a0d70c7dba9ef46e6928cdd7362e19618a919e77 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <83104894+ca-nguyen@users.noreply.github.com> Date: Sat, 11 Sep 2021 00:57:30 -0700 Subject: [PATCH 7/9] Update src/stepfunctions/steps/states.py Co-authored-by: Adam Wong <55506708+wong-a@users.noreply.github.com> --- src/stepfunctions/steps/states.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 7524e80..368a508 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -73,11 +73,9 @@ def to_dict(self): k = to_pascalcase(k) if k == to_pascalcase(Field.Parameters.value): result[k] = self._replace_placeholders(v) - elif self._is_placeholder_compatible(k): - if isinstance(v, Placeholder): - result[k] = v.to_jsonpath() - else: - result[k] = v + elif self._is_placeholder_compatible(k) and isinstance(v, Placeholder): + result[k] = v.to_jsonpath() + else: result[k] = v From 6dcfbaf307790e68dcc5dbae43635e09048a1d34 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <83104894+ca-nguyen@users.noreply.github.com> Date: Fri, 22 Oct 2021 19:46:57 -0700 Subject: [PATCH 8/9] Update doc/placeholders.rst Co-authored-by: Adam Wong <55506708+wong-a@users.noreply.github.com> --- doc/placeholders.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/placeholders.rst b/doc/placeholders.rst index 2fee2fc..e3ad5e9 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -82,7 +82,7 @@ that returns the placeholder output for that step. definition = Chain([lambda_state_first, lambda_state_second]) -The third and fourth mechanisms can be used to access context objects from the Map state. +The third and fourth mechanisms can be used to access the context object from Map states. The SDK provides the :py:meth:`stepfunctions.inputs.MapItemIndex` class that allows you to get the index number of the array item that is being processed in the current iteration and the :py:meth:`stepfunctions.inputs.MapItemValue` class which is used for accessing the value of the array item that is currently being processed. From 9942f959f670d8c603b527f1afe06c2439f6acbe Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <carolngu@amazon.com> Date: Fri, 22 Oct 2021 20:26:51 -0700 Subject: [PATCH 9/9] Updated unit tests and doc --- doc/placeholders.rst | 49 ++++------------- .../unit/test_path_placeholders_with_steps.py | 52 +------------------ 2 files changed, 12 insertions(+), 89 deletions(-) diff --git a/doc/placeholders.rst b/doc/placeholders.rst index e3ad5e9..0e818e9 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -93,15 +93,15 @@ which is used for accessing the value of the array item that is currently being # a schema is optional, but it is a good practice map_item_value = MapItemValue(schema={ 'name': str, - 'age': str + 'points': str }) map_state = Map( 'MapState', parameters={ - "MapIndex": MapItemIndex(), - "Name": map_item_value['name'], - "Age": map_item_value['age'] + "Ranking": MapItemIndex(), + "Contestant": map_item_value['name'], + "Score": map_item_value['points'] } ) iterator_state = Pass('TrainIterator') @@ -117,51 +117,24 @@ which is used for accessing the value of the array item that is currently being # Create the workflow on AWS Step Functions workflow.create() - # This creates a workflow with the following definition: - """ - { - "StartAt": "MapState01", - "States": { - "MapState": { - "Parameters": { - "MapIndex.$": "$$.Map.Item.Index", - "Name.$": "$$.Map.Item.Value['name']", - "Age.$": "$$.Map.Item.Value['age']" - }, - "Type": "Map", - "End": true, - "Iterator": { - "StartAt": "TrainIterator", - "States": { - "TrainIterator": { - "Type": "Pass", - "End": true - } - } - } - } - } - } - """ - # The placeholder is assigned a value during execution. The SDK will # verify that all placeholder values are assigned values, and that # these values are of the expected type based on the defined schema # before the execution starts. - workflow_input = execution_input = [{"name": "John", "age": 21}, {"name": "Snow", "age": 18}] + workflow_input = execution_input = [{"name": "John", "points": "101"}, {"name": "Snow", "points": "99"}] workflow.execute(inputs=workflow_input) # The execution output will be: [ { - "MapIndex": 0, - "Age": 21, - "Name": "John" + "Ranking": 0, + "Contestant": "John", + "Score": "101", }, { - "MapIndex": 1, - "Age": 18, - "Name": "Snow" + "Ranking": 1, + "Contestant": "Snow", + "Score": "99" } ] diff --git a/tests/unit/test_path_placeholders_with_steps.py b/tests/unit/test_path_placeholders_with_steps.py index 4e18655..6bbed03 100644 --- a/tests/unit/test_path_placeholders_with_steps.py +++ b/tests/unit/test_path_placeholders_with_steps.py @@ -18,25 +18,6 @@ from stepfunctions.inputs import ExecutionInput, StepInput from stepfunctions.steps import Pass, Succeed, Wait, Choice, Parallel, Task, Map -from stepfunctions.steps.compute import LambdaStep, GlueStartJobRunStep, BatchSubmitJobStep, EcsRunTaskStep -from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep,\ - DynamoDBDeleteItemStep -from stepfunctions.steps.service import ( - EksCallStep, - EksCreateClusterStep, - EksCreateFargateProfileStep, - EksCreateNodeGroupStep, - EksDeleteClusterStep, - EksDeleteFargateProfileStep, - EksDeleteNodegroupStep, - EksRunJobStep, -) -from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep,\ - EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep,\ - EmrModifyInstanceGroupByNameStep -from stepfunctions.steps.service import EventBridgePutEventsStep -from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep -from stepfunctions.steps.service import GlueDataBrewStartJobRunStep from stepfunctions.steps.states import State @pytest.mark.parametrize("state, state_id, extra_args", [ @@ -48,38 +29,7 @@ (Parallel, "ParallelState", {}), (Task, "TaskState", {}), (Wait, "WaitState", {'seconds': 10}), - (Map, "MapState", {'iterator': Pass('PassState')}), - - # Compute steps - (LambdaStep, "LambdaStep", {}), - (EcsRunTaskStep, "EcsRunTaskStep", {}), - (BatchSubmitJobStep, "BatchSubmitJobStep", {}), - (GlueStartJobRunStep, "GlueStartJobRunStep", {}), - - # Service steps - (DynamoDBGetItemStep, "DynamoDBGetItemStep", {}), - (DynamoDBPutItemStep, "DynamoDBPutItemStep", {}), - (DynamoDBUpdateItemStep, "DynamoDBUpdateItemStep", {}), - (DynamoDBDeleteItemStep, "DynamoDBDeleteItemStep", {}), - (EksCallStep, "EksCallStep", {}), - (EksCreateClusterStep, "EksCreateClusterStep", {}), - (EksCreateFargateProfileStep, "EksCreateFargateProfileStep", {}), - (EksCreateNodeGroupStep, "EksCreateNodeGroupStep", {}), - (EksDeleteClusterStep, "EksDeleteClusterStep", {}), - (EksDeleteFargateProfileStep, "EksDeleteFargateProfileStep", {}), - (EksDeleteNodegroupStep, "EksDeleteNodegroupStep", {}), - (EksRunJobStep, "EksRunJobStep", {}), - (EmrCreateClusterStep, "EmrCreateClusterStep", {}), - (EmrTerminateClusterStep, "EmrTerminateClusterStep", {}), - (EmrAddStepStep, "EmrAddStepStep", {}), - (EmrCancelStepStep, "EmrCancelStepStep", {}), - (EmrSetClusterTerminationProtectionStep, "EmrSetClusterTerminationProtectionStep", {}), - (EmrModifyInstanceFleetByNameStep, "EmrModifyInstanceFleetByNameStep", {}), - (EmrModifyInstanceGroupByNameStep, "EmrModifyInstanceGroupByNameStep", {}), - (EventBridgePutEventsStep, "EventBridgePutEventsStep", {}), - (SnsPublishStep, "SnsPublishStep", {}), - (SqsSendMessageStep, "SqsSendMessageStep", {}), - (GlueDataBrewStartJobRunStep, "GlueDataBrewStartJobRunStep", {}) + (Map, "MapState", {'iterator': Pass('PassState')}) ]) @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_service_step_creation_with_placeholders(state, state_id, extra_args):