Skip to content

Commit dd148ae

Browse files
wmlbaWill Badr
and
Will Badr
authored
adding support for filters to TransformStep (#53)
* adding support for filters to TransformStep * adding support for filters to TransformStep with unit tests * adding support for filters to TransfomStep * Fixing a type in the unit test * reordering the new arguments for backward compatibility Co-authored-by: Will Badr <[email protected]>
1 parent 97147ab commit dd148ae

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

Diff for: src/stepfunctions/steps/sagemaker.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class TransformStep(Task):
115115
Creates a Task State to execute a `SageMaker Transform Job <https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTransformJob.html>`_.
116116
"""
117117

118-
def __init__(self, state_id, transformer, job_name, model_name, data, data_type='S3Prefix', content_type=None, compression_type=None, split_type=None, experiment_config=None, wait_for_completion=True, tags=None, **kwargs):
118+
def __init__(self, state_id, transformer, job_name, model_name, data, data_type='S3Prefix', content_type=None, compression_type=None, split_type=None, experiment_config=None, wait_for_completion=True, tags=None, input_filter=None, output_filter=None, join_source=None, **kwargs):
119119
"""
120120
Args:
121121
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
@@ -136,6 +136,9 @@ def __init__(self, state_id, transformer, job_name, model_name, data, data_type=
136136
experiment_config (dict, optional): Specify the experiment config for the transform. (Default: None)
137137
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the transform job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the transform job and proceed to the next step. (default: True)
138138
tags (list[dict], optional): `List to tags <https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html>`_ to associate with the resource.
139+
input_filter (str): A JSONPath to select a portion of the input to pass to the algorithm container for inference. If you omit the field, it gets the value ‘$’, representing the entire input. For CSV data, each row is taken as a JSON array, so only index-based JSONPaths can be applied, e.g. $[0], $[1:]. CSV data should follow the RFC format. See Supported JSONPath Operators for a table of supported JSONPath operators. For more information, see the SageMaker API documentation for CreateTransformJob. Some examples: “$[1:]”, “$.features” (default: None).
140+
output_filter (str): A JSONPath to select a portion of the joined/original output to return as the output. For more information, see the SageMaker API documentation for CreateTransformJob. Some examples: “$[1:]”, “$.prediction” (default: None).
141+
join_source (str): The source of data to be joined to the transform output. It can be set to ‘Input’ meaning the entire input record will be joined to the inference result. You can use OutputFilter to select the useful portion before uploading to S3. (default: None). Valid values: Input, None.
139142
"""
140143
if wait_for_completion:
141144
kwargs[Field.Resource.value] = 'arn:aws:states:::sagemaker:createTransformJob.sync'
@@ -150,7 +153,10 @@ def __init__(self, state_id, transformer, job_name, model_name, data, data_type=
150153
content_type=content_type,
151154
compression_type=compression_type,
152155
split_type=split_type,
153-
job_name=job_name
156+
job_name=job_name,
157+
input_filter=input_filter,
158+
output_filter=output_filter,
159+
join_source=join_source
154160
)
155161
else:
156162
parameters = transform_config(
@@ -159,7 +165,10 @@ def __init__(self, state_id, transformer, job_name, model_name, data, data_type=
159165
data_type=data_type,
160166
content_type=content_type,
161167
compression_type=compression_type,
162-
split_type=split_type
168+
split_type=split_type,
169+
input_filter=input_filter,
170+
output_filter=output_filter,
171+
join_source=join_source
163172
)
164173

165174
if isinstance(job_name, (ExecutionInput, StepInput)):

Diff for: tests/unit/test_sagemaker_steps.py

+8
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,9 @@ def test_transform_step_creation(pca_transformer):
390390
'TrialComponentDisplayName': 'Transform'
391391
},
392392
tags=DEFAULT_TAGS,
393+
join_source='Input',
394+
output_filter='$[2:]',
395+
input_filter='$[1:]'
393396
)
394397
assert step.to_dict() == {
395398
'Type': 'Task',
@@ -416,6 +419,11 @@ def test_transform_step_creation(pca_transformer):
416419
'TrialName': 'pca_trial',
417420
'TrialComponentDisplayName': 'Transform'
418421
},
422+
'DataProcessing': {
423+
'InputFilter': '$[1:]',
424+
'OutputFilter': '$[2:]',
425+
'JoinSource': 'Input',
426+
},
419427
'Tags': DEFAULT_TAGS_LIST
420428
},
421429
'Resource': 'arn:aws:states:::sagemaker:createTransformJob.sync',

0 commit comments

Comments
 (0)