From 27f46615596ed75f7f0f4d6245c9cb4270f2e559 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Fri, 11 Jun 2021 09:51:53 -0700 Subject: [PATCH 01/11] Feature: Adding placeholder output_path parameter to TrainingStep to propagate to Estimator --- src/stepfunctions/steps/sagemaker.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index deb5176..3a659b9 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -43,7 +43,7 @@ class TrainingStep(Task): Creates a Task State to execute a `SageMaker Training Job `_. The TrainingStep will also create a model by default, and the model shares the same name as the training job. """ - def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=None, mini_batch_size=None, experiment_config=None, wait_for_completion=True, tags=None, **kwargs): + def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=None, mini_batch_size=None, experiment_config=None, wait_for_completion=True, tags=None, output_path=None, **kwargs): """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. @@ -69,6 +69,13 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non experiment_config (dict, optional): Specify the experiment config for the training. (Default: None) wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait for the training job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the training job and proceed to the next step. (default: True) tags (list[dict], optional): `List to tags `_ to associate with the resource. + output_path (Placeholder, optional): S3 location for saving the training result (model + artifacts and output files) to propagate to estimator. If not specified, results are + stored to a default bucket. If the bucket with the specific name + does not exist, the estimator creates the bucket during the + :meth:`~sagemaker.estimator.EstimatorBase.fit` method execution. + file:// urls are used for local mode. For example: 'file://model/' + will save to the model folder in the current directory. """ self.estimator = estimator self.job_name = job_name @@ -103,6 +110,10 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non if isinstance(job_name, (ExecutionInput, StepInput)): parameters['TrainingJobName'] = job_name + if output_path is not None: + if isinstance(output_path, (ExecutionInput, StepInput)): + parameters['OutputDataConfig']['S3OutputPath'] = output_path + if hyperparameters is not None: parameters['HyperParameters'] = hyperparameters From 8048bc4c70f90025cdc415e2a55c35c98a040cc5 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Fri, 11 Jun 2021 19:07:45 -0700 Subject: [PATCH 02/11] Added integ test to validate training step with output_config placeholder --- tests/integ/test_sagemaker_steps.py | 54 +++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/integ/test_sagemaker_steps.py b/tests/integ/test_sagemaker_steps.py index 73d1a5b..9957afa 100644 --- a/tests/integ/test_sagemaker_steps.py +++ b/tests/integ/test_sagemaker_steps.py @@ -29,6 +29,7 @@ from sagemaker.tuner import HyperparameterTuner from sagemaker.processing import ProcessingInput, ProcessingOutput +from stepfunctions.inputs import ExecutionInput from stepfunctions.steps import Chain from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, TuningStep, ProcessingStep from stepfunctions.workflow import Workflow @@ -104,6 +105,59 @@ def test_training_step(pca_estimator_fixture, record_set_fixture, sfn_client, sf state_machine_delete_wait(sfn_client, workflow.state_machine_arn) # End of Cleanup + +def test_training_step_with_placeholders(pca_estimator_fixture, + record_set_fixture, + sfn_client, + sfn_role_arn, + sagemaker_session): + # Build workflow definition + execution_input = ExecutionInput(schema={ + 'JobName': str, + 'OutputPath': str + }) + + job_name_placeholder = execution_input['JobName'] + output_path_placeholder = execution_input['OutputPath'] + training_step_name = 'TrainingStep' + + training_step = TrainingStep(training_step_name, + estimator=pca_estimator_fixture, + job_name=job_name_placeholder, + data=record_set_fixture, + mini_batch_size=200, + output_path=output_path_placeholder) + workflow_graph = Chain([training_step]) + + with timeout(minutes=DEFAULT_TIMEOUT_MINUTES): + # Create workflow and check definition + workflow = create_workflow_and_check_definition( + workflow_graph=workflow_graph, + workflow_name=unique_name_from_base("integ-test-training-step-workflow"), + sfn_client=sfn_client, + sfn_role_arn=sfn_role_arn + ) + + # Execute workflow + job_id = generate_job_name() + execution_input = { + 'OutputPath': f's3://{sagemaker_session.default_bucket()}/', + 'JobName': f'TrainingJob-{job_id}' + } + + execution = workflow.execute(inputs=execution_input) + execution_output = execution.get_output(wait=True) + expected_output_data_config = {"S3OutputPath.$": "$$.Execution.Input['OutputPath']"} + assert workflow.definition.states[training_step_name]['Parameters']['OutputDataConfig'] == \ + expected_output_data_config + # Check workflow output + assert execution_output.get("TrainingJobStatus") == "Completed" + + # Cleanup + state_machine_delete_wait(sfn_client, workflow.state_machine_arn) + # End of Cleanup + + def test_model_step(trained_estimator, sfn_client, sagemaker_session, sfn_role_arn): # Build workflow definition model_name = generate_job_name() From 97aff531850d24671c783367c0016cf5b515e85a Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Mon, 14 Jun 2021 17:50:49 -0700 Subject: [PATCH 03/11] Make data param compatible with placeholder and add unit test for training step creation with placeholders --- src/stepfunctions/steps/sagemaker.py | 14 +++++- tests/integ/test_sagemaker_steps.py | 16 ++++-- tests/unit/test_sagemaker_steps.py | 73 ++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 6 deletions(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index 3a659b9..b29be8b 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -51,7 +51,7 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non job_name (str or Placeholder): Specify a training job name, this is required for the training job to run. We recommend to use :py:class:`~stepfunctions.inputs.ExecutionInput` placeholder collection to pass the value dynamically in each execution. data: Information about the training data. Please refer to the ``fit()`` method of the associated estimator, as this can take any of the following forms: - * (str) - The S3 location where training data is saved. + * (str or Placeholder) - The S3 location where training data is saved. * (dict[str, str] or dict[str, sagemaker.inputs.TrainingInput]) - If using multiple channels for training data, you can specify a dict mapping channel names to strings or :func:`~sagemaker.inputs.TrainingInput` objects. @@ -95,6 +95,12 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non kwargs[Field.Resource.value] = get_service_integration_arn(SAGEMAKER_SERVICE_NAME, SageMakerApi.CreateTrainingJob) + # sagemaker.workflow.airflow.training_config does not accept ExecutionInput as input, only str, dict, + # TrainingInput or FileSystemInput. Transform placeholder into JSONpath to be processed and included in + # generated Sagemaker parameters. Placeholder for uri str only: + data_placeholder = isinstance(data, (ExecutionInput, StepInput)) + if data_placeholder: + data = data.to_jsonpath() if isinstance(job_name, str): parameters = training_config(estimator=estimator, inputs=data, job_name=job_name, mini_batch_size=mini_batch_size) @@ -114,6 +120,12 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non if isinstance(output_path, (ExecutionInput, StepInput)): parameters['OutputDataConfig']['S3OutputPath'] = output_path + if data is not None and data_placeholder: + # Replace the 'S3Uri' key with one that supports JSONpath value. + # Support for uri str only: The list will only contain 1 element + temp_data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None) + parameters['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri.$'] = temp_data_uri + if hyperparameters is not None: parameters['HyperParameters'] = hyperparameters diff --git a/tests/integ/test_sagemaker_steps.py b/tests/integ/test_sagemaker_steps.py index 9957afa..00ad7f8 100644 --- a/tests/integ/test_sagemaker_steps.py +++ b/tests/integ/test_sagemaker_steps.py @@ -114,7 +114,8 @@ def test_training_step_with_placeholders(pca_estimator_fixture, # Build workflow definition execution_input = ExecutionInput(schema={ 'JobName': str, - 'OutputPath': str + 'OutputPath': str, + 'Data': str }) job_name_placeholder = execution_input['JobName'] @@ -145,12 +146,17 @@ def test_training_step_with_placeholders(pca_estimator_fixture, 'JobName': f'TrainingJob-{job_id}' } + # Check placeholders in workflow definition: execution = workflow.execute(inputs=execution_input) - execution_output = execution.get_output(wait=True) - expected_output_data_config = {"S3OutputPath.$": "$$.Execution.Input['OutputPath']"} - assert workflow.definition.states[training_step_name]['Parameters']['OutputDataConfig'] == \ - expected_output_data_config + expected_output_data_key_value_pair = ("S3OutputPath.$", "$$.Execution.Input['OutputPath']") + assert expected_output_data_key_value_pair in \ + workflow.definition.states[training_step_name]['Parameters']['OutputDataConfig'].items() + + expected_job_name_key_value_pair = ("TrainingJobName.$", "$$.Execution.Input['JobName']") + assert expected_job_name_key_value_pair in \ + workflow.definition.states[training_step_name]['Parameters'].items() # Check workflow output + execution_output = execution.get_output(wait=True) assert execution_output.get("TrainingJobStatus") == "Completed" # Cleanup diff --git a/tests/unit/test_sagemaker_steps.py b/tests/unit/test_sagemaker_steps.py index 2ce9d3b..a19aa31 100644 --- a/tests/unit/test_sagemaker_steps.py +++ b/tests/unit/test_sagemaker_steps.py @@ -26,6 +26,7 @@ from sagemaker.processing import ProcessingInput, ProcessingOutput from unittest.mock import MagicMock, patch +from stepfunctions.inputs import ExecutionInput from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, ProcessingStep from stepfunctions.steps.sagemaker import tuning_config @@ -265,6 +266,78 @@ def test_training_step_creation(pca_estimator): } +@patch('botocore.client.BaseClient._make_api_call', new=mock_boto_api_call) +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_training_step_creation_with_placeholders(pca_estimator): + execution_input = ExecutionInput(schema={ + 'Data': str, + 'JobName': str, + 'OutputPath': str, + }) + + step = TrainingStep('Training', + estimator=pca_estimator, + job_name= execution_input['JobName'], + data=execution_input['Data'], + output_path=execution_input['OutputPath'], + experiment_config={ + 'ExperimentName': 'pca_experiment', + 'TrialName': 'pca_trial', + 'TrialComponentDisplayName': 'Training' + }, + tags=DEFAULT_TAGS, + ) + assert step.to_dict() == { + 'Type': 'Task', + 'Parameters': { + 'AlgorithmSpecification': { + 'TrainingImage': PCA_IMAGE, + 'TrainingInputMode': 'File' + }, + 'OutputDataConfig': { + 'S3OutputPath.$': "$$.Execution.Input['OutputPath']" + }, + 'StoppingCondition': { + 'MaxRuntimeInSeconds': 86400 + }, + 'ResourceConfig': { + 'InstanceCount': 1, + 'InstanceType': 'ml.c4.xlarge', + 'VolumeSizeInGB': 30 + }, + 'RoleArn': EXECUTION_ROLE, + 'HyperParameters': { + 'feature_dim': '50000', + 'num_components': '10', + 'subtract_mean': 'True', + 'algorithm_mode': 'randomized', + 'mini_batch_size': '200' + }, + 'InputDataConfig': [ + { + 'ChannelName': 'training', + 'DataSource': { + 'S3DataSource': { + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3Uri.$': "$$.Execution.Input['Data']" + } + } + } + ], + 'ExperimentConfig': { + 'ExperimentName': 'pca_experiment', + 'TrialName': 'pca_trial', + 'TrialComponentDisplayName': 'Training' + }, + 'TrainingJobName.$': "$$.Execution.Input['JobName']", + 'Tags': DEFAULT_TAGS_LIST + }, + 'Resource': 'arn:aws:states:::sagemaker:createTrainingJob.sync', + 'End': True + } + + @patch('botocore.client.BaseClient._make_api_call', new=mock_boto_api_call) @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_training_step_creation_with_debug_hook(pca_estimator_with_debug_hook): From b2b92503baae51a1c74d04928a332d31fa712166 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Mon, 14 Jun 2021 18:51:51 -0700 Subject: [PATCH 04/11] Allow output_path to be passed a str --- src/stepfunctions/steps/sagemaker.py | 4 ++-- tests/unit/test_sagemaker_steps.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index b29be8b..91f7772 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -69,7 +69,7 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non experiment_config (dict, optional): Specify the experiment config for the training. (Default: None) wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait for the training job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the training job and proceed to the next step. (default: True) tags (list[dict], optional): `List to tags `_ to associate with the resource. - output_path (Placeholder, optional): S3 location for saving the training result (model + output_path (str or Placeholder, optional): S3 location for saving the training result (model artifacts and output files) to propagate to estimator. If not specified, results are stored to a default bucket. If the bucket with the specific name does not exist, the estimator creates the bucket during the @@ -117,7 +117,7 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non parameters['TrainingJobName'] = job_name if output_path is not None: - if isinstance(output_path, (ExecutionInput, StepInput)): + if isinstance(output_path, (ExecutionInput, StepInput)) or isinstance(output_path, str): parameters['OutputDataConfig']['S3OutputPath'] = output_path if data is not None and data_placeholder: diff --git a/tests/unit/test_sagemaker_steps.py b/tests/unit/test_sagemaker_steps.py index a19aa31..60a19db 100644 --- a/tests/unit/test_sagemaker_steps.py +++ b/tests/unit/test_sagemaker_steps.py @@ -225,6 +225,7 @@ def test_training_step_creation(pca_estimator): 'TrialName': 'pca_trial', 'TrialComponentDisplayName': 'Training' }, + output_path='s3://sagemaker-us-east-1-111111111111', tags=DEFAULT_TAGS, ) assert step.to_dict() == { @@ -235,7 +236,7 @@ def test_training_step_creation(pca_estimator): 'TrainingInputMode': 'File' }, 'OutputDataConfig': { - 'S3OutputPath': 's3://sagemaker/models' + 'S3OutputPath': 's3://sagemaker-us-east-1-111111111111' }, 'StoppingCondition': { 'MaxRuntimeInSeconds': 86400 From f38499a9c43db09d02b05294a6a3792554125b40 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Wed, 16 Jun 2021 19:57:04 -0700 Subject: [PATCH 05/11] Use Placeholder type to validate placeholder param and renamed boolean --- src/stepfunctions/steps/sagemaker.py | 22 ++++++++++------------ src/stepfunctions/steps/states.py | 4 ++-- tests/integ/test_sagemaker_steps.py | 11 ++--------- 3 files changed, 14 insertions(+), 23 deletions(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index 91f7772..082ec17 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -13,7 +13,7 @@ from __future__ import absolute_import from enum import Enum -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import Placeholder from stepfunctions.steps.states import Task from stepfunctions.steps.fields import Field from stepfunctions.steps.utils import tags_dict_to_kv_list @@ -95,11 +95,10 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non kwargs[Field.Resource.value] = get_service_integration_arn(SAGEMAKER_SERVICE_NAME, SageMakerApi.CreateTrainingJob) - # sagemaker.workflow.airflow.training_config does not accept ExecutionInput as input, only str, dict, - # TrainingInput or FileSystemInput. Transform placeholder into JSONpath to be processed and included in - # generated Sagemaker parameters. Placeholder for uri str only: - data_placeholder = isinstance(data, (ExecutionInput, StepInput)) - if data_placeholder: + # sagemaker.workflow.airflow.training_config does not accept Placeholder as input. Transform data placeholder + # to JSONpath to generate parameters. + is_data_placeholder = isinstance(data, Placeholder) + if is_data_placeholder: data = data.to_jsonpath() if isinstance(job_name, str): @@ -113,14 +112,13 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non if estimator.rules != None: parameters['DebugRuleConfigurations'] = [rule.to_debugger_rule_config_dict() for rule in estimator.rules] - if isinstance(job_name, (ExecutionInput, StepInput)): + if isinstance(job_name, Placeholder): parameters['TrainingJobName'] = job_name if output_path is not None: - if isinstance(output_path, (ExecutionInput, StepInput)) or isinstance(output_path, str): - parameters['OutputDataConfig']['S3OutputPath'] = output_path + parameters['OutputDataConfig']['S3OutputPath'] = output_path - if data is not None and data_placeholder: + if data is not None and is_data_placeholder: # Replace the 'S3Uri' key with one that supports JSONpath value. # Support for uri str only: The list will only contain 1 element temp_data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None) @@ -232,7 +230,7 @@ def __init__(self, state_id, transformer, job_name, model_name, data, data_type= join_source=join_source ) - if isinstance(job_name, (ExecutionInput, StepInput)): + if isinstance(job_name, Placeholder): parameters['TransformJobName'] = job_name parameters['ModelName'] = model_name @@ -501,7 +499,7 @@ def __init__(self, state_id, processor, job_name, inputs=None, outputs=None, exp else: parameters = processing_config(processor=processor, inputs=inputs, outputs=outputs, container_arguments=container_arguments, container_entrypoint=container_entrypoint, kms_key_id=kms_key_id) - if isinstance(job_name, (ExecutionInput, StepInput)): + if isinstance(job_name, Placeholder): parameters['ProcessingJobName'] = job_name if experiment_config is not None: diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 9c86457..9669a73 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -17,7 +17,7 @@ from stepfunctions.exceptions import DuplicateStatesInChain from stepfunctions.steps.fields import Field -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import Placeholder, StepInput logger = logging.getLogger('stepfunctions.states') @@ -53,7 +53,7 @@ def _replace_placeholders(self, params): return params modified_parameters = {} for k, v in params.items(): - if isinstance(v, (ExecutionInput, StepInput)): + if isinstance(v, Placeholder): modified_key = "{key}.$".format(key=k) modified_parameters[modified_key] = v.to_jsonpath() elif isinstance(v, dict): diff --git a/tests/integ/test_sagemaker_steps.py b/tests/integ/test_sagemaker_steps.py index 00ad7f8..78a66e7 100644 --- a/tests/integ/test_sagemaker_steps.py +++ b/tests/integ/test_sagemaker_steps.py @@ -106,6 +106,7 @@ def test_training_step(pca_estimator_fixture, record_set_fixture, sfn_client, sf # End of Cleanup +# TODO: Add integ test with StepInput def test_training_step_with_placeholders(pca_estimator_fixture, record_set_fixture, sfn_client, @@ -146,16 +147,8 @@ def test_training_step_with_placeholders(pca_estimator_fixture, 'JobName': f'TrainingJob-{job_id}' } - # Check placeholders in workflow definition: - execution = workflow.execute(inputs=execution_input) - expected_output_data_key_value_pair = ("S3OutputPath.$", "$$.Execution.Input['OutputPath']") - assert expected_output_data_key_value_pair in \ - workflow.definition.states[training_step_name]['Parameters']['OutputDataConfig'].items() - - expected_job_name_key_value_pair = ("TrainingJobName.$", "$$.Execution.Input['JobName']") - assert expected_job_name_key_value_pair in \ - workflow.definition.states[training_step_name]['Parameters'].items() # Check workflow output + execution = workflow.execute(inputs=execution_input) execution_output = execution.get_output(wait=True) assert execution_output.get("TrainingJobStatus") == "Completed" From 6cfde0c39c52a0dd9485ff7dfc17c02dfa4be4a5 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Thu, 17 Jun 2021 13:58:01 -0700 Subject: [PATCH 06/11] Updated variable names and comments includede StepInput in unit test --- src/stepfunctions/steps/sagemaker.py | 29 +++++++++------ tests/integ/test_sagemaker_steps.py | 53 +--------------------------- tests/unit/test_sagemaker_steps.py | 13 ++++--- 3 files changed, 28 insertions(+), 67 deletions(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index 082ec17..1d6bcdb 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -13,6 +13,8 @@ from __future__ import absolute_import from enum import Enum +import logging + from stepfunctions.inputs import Placeholder from stepfunctions.steps.states import Task from stepfunctions.steps.fields import Field @@ -24,7 +26,7 @@ from sagemaker.model_monitor import DataCaptureConfig SAGEMAKER_SERVICE_NAME = "sagemaker" - +logger = logging.getLogger('stepfunctions.sagemaker') class SageMakerApi(Enum): CreateTrainingJob = "createTrainingJob" @@ -43,7 +45,7 @@ class TrainingStep(Task): Creates a Task State to execute a `SageMaker Training Job `_. The TrainingStep will also create a model by default, and the model shares the same name as the training job. """ - def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=None, mini_batch_size=None, experiment_config=None, wait_for_completion=True, tags=None, output_path=None, **kwargs): + def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=None, mini_batch_size=None, experiment_config=None, wait_for_completion=True, tags=None, output_data_config_path=None, **kwargs): """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. @@ -69,8 +71,9 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non experiment_config (dict, optional): Specify the experiment config for the training. (Default: None) wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait for the training job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the training job and proceed to the next step. (default: True) tags (list[dict], optional): `List to tags `_ to associate with the resource. - output_path (str or Placeholder, optional): S3 location for saving the training result (model - artifacts and output files) to propagate to estimator. If not specified, results are + output_data_config_path (str or Placeholder, optional): S3 location for saving the training result (model + artifacts and output files) to propagate to estimator (maps to sagemaker.estimator.EstimatorBase + output_path parameter.) If not specified, results are stored to a default bucket. If the bucket with the specific name does not exist, the estimator creates the bucket during the :meth:`~sagemaker.estimator.EstimatorBase.fit` method execution. @@ -95,8 +98,8 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non kwargs[Field.Resource.value] = get_service_integration_arn(SAGEMAKER_SERVICE_NAME, SageMakerApi.CreateTrainingJob) - # sagemaker.workflow.airflow.training_config does not accept Placeholder as input. Transform data placeholder - # to JSONpath to generate parameters. + # Convert `data` Placeholder to a JSONPath string because sagemaker.workflow.airflow.training_config does not + # accept Placeholder in the `input` argument. We will suffix the 'S3Uri' key in `parameters` with ".$" later. is_data_placeholder = isinstance(data, Placeholder) if is_data_placeholder: data = data.to_jsonpath() @@ -115,14 +118,20 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non if isinstance(job_name, Placeholder): parameters['TrainingJobName'] = job_name - if output_path is not None: - parameters['OutputDataConfig']['S3OutputPath'] = output_path + if output_data_config_path is not None: + parameters['OutputDataConfig']['S3OutputPath'] = output_data_config_path if data is not None and is_data_placeholder: # Replace the 'S3Uri' key with one that supports JSONpath value. # Support for uri str only: The list will only contain 1 element - temp_data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None) - parameters['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri.$'] = temp_data_uri + try: + data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None) + parameters['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri.$'] = data_uri + except KeyError as error: + logger.warning(f"Placeholder {data} used for data, but could not locate S3Uri property to make " + f"placeholder compatible.\n" + f"Path used:['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri'] - " + f"{error} key was not found.") if hyperparameters is not None: parameters['HyperParameters'] = hyperparameters diff --git a/tests/integ/test_sagemaker_steps.py b/tests/integ/test_sagemaker_steps.py index 78a66e7..e926f08 100644 --- a/tests/integ/test_sagemaker_steps.py +++ b/tests/integ/test_sagemaker_steps.py @@ -29,7 +29,7 @@ from sagemaker.tuner import HyperparameterTuner from sagemaker.processing import ProcessingInput, ProcessingOutput -from stepfunctions.inputs import ExecutionInput +from stepfunctions.inputs import ExecutionInput, StepInput from stepfunctions.steps import Chain from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, TuningStep, ProcessingStep from stepfunctions.workflow import Workflow @@ -106,57 +106,6 @@ def test_training_step(pca_estimator_fixture, record_set_fixture, sfn_client, sf # End of Cleanup -# TODO: Add integ test with StepInput -def test_training_step_with_placeholders(pca_estimator_fixture, - record_set_fixture, - sfn_client, - sfn_role_arn, - sagemaker_session): - # Build workflow definition - execution_input = ExecutionInput(schema={ - 'JobName': str, - 'OutputPath': str, - 'Data': str - }) - - job_name_placeholder = execution_input['JobName'] - output_path_placeholder = execution_input['OutputPath'] - training_step_name = 'TrainingStep' - - training_step = TrainingStep(training_step_name, - estimator=pca_estimator_fixture, - job_name=job_name_placeholder, - data=record_set_fixture, - mini_batch_size=200, - output_path=output_path_placeholder) - workflow_graph = Chain([training_step]) - - with timeout(minutes=DEFAULT_TIMEOUT_MINUTES): - # Create workflow and check definition - workflow = create_workflow_and_check_definition( - workflow_graph=workflow_graph, - workflow_name=unique_name_from_base("integ-test-training-step-workflow"), - sfn_client=sfn_client, - sfn_role_arn=sfn_role_arn - ) - - # Execute workflow - job_id = generate_job_name() - execution_input = { - 'OutputPath': f's3://{sagemaker_session.default_bucket()}/', - 'JobName': f'TrainingJob-{job_id}' - } - - # Check workflow output - execution = workflow.execute(inputs=execution_input) - execution_output = execution.get_output(wait=True) - assert execution_output.get("TrainingJobStatus") == "Completed" - - # Cleanup - state_machine_delete_wait(sfn_client, workflow.state_machine_arn) - # End of Cleanup - - def test_model_step(trained_estimator, sfn_client, sagemaker_session, sfn_role_arn): # Build workflow definition model_name = generate_job_name() diff --git a/tests/unit/test_sagemaker_steps.py b/tests/unit/test_sagemaker_steps.py index 60a19db..e553d5f 100644 --- a/tests/unit/test_sagemaker_steps.py +++ b/tests/unit/test_sagemaker_steps.py @@ -26,7 +26,7 @@ from sagemaker.processing import ProcessingInput, ProcessingOutput from unittest.mock import MagicMock, patch -from stepfunctions.inputs import ExecutionInput +from stepfunctions.inputs import ExecutionInput, StepInput from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, ProcessingStep from stepfunctions.steps.sagemaker import tuning_config @@ -272,15 +272,18 @@ def test_training_step_creation(pca_estimator): def test_training_step_creation_with_placeholders(pca_estimator): execution_input = ExecutionInput(schema={ 'Data': str, - 'JobName': str, 'OutputPath': str, }) + step_input = StepInput(schema={ + 'JobName': str, + }) + step = TrainingStep('Training', estimator=pca_estimator, - job_name= execution_input['JobName'], + job_name=step_input['JobName'], data=execution_input['Data'], - output_path=execution_input['OutputPath'], + output_data_config_path=execution_input['OutputPath'], experiment_config={ 'ExperimentName': 'pca_experiment', 'TrialName': 'pca_trial', @@ -331,7 +334,7 @@ def test_training_step_creation_with_placeholders(pca_estimator): 'TrialName': 'pca_trial', 'TrialComponentDisplayName': 'Training' }, - 'TrainingJobName.$': "$$.Execution.Input['JobName']", + 'TrainingJobName.$': "$['JobName']", 'Tags': DEFAULT_TAGS_LIST }, 'Resource': 'arn:aws:states:::sagemaker:createTrainingJob.sync', From 9fa066037c65931ae1754e5a282545bd191b9633 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <83104894+ca-nguyen@users.noreply.github.com> Date: Thu, 17 Jun 2021 16:09:07 -0700 Subject: [PATCH 07/11] Update output_data_config_path description Co-authored-by: Adam Wong <55506708+wong-a@users.noreply.github.com> --- src/stepfunctions/steps/sagemaker.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index 1d6bcdb..994f8c7 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -72,13 +72,7 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait for the training job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the training job and proceed to the next step. (default: True) tags (list[dict], optional): `List to tags `_ to associate with the resource. output_data_config_path (str or Placeholder, optional): S3 location for saving the training result (model - artifacts and output files) to propagate to estimator (maps to sagemaker.estimator.EstimatorBase - output_path parameter.) If not specified, results are - stored to a default bucket. If the bucket with the specific name - does not exist, the estimator creates the bucket during the - :meth:`~sagemaker.estimator.EstimatorBase.fit` method execution. - file:// urls are used for local mode. For example: 'file://model/' - will save to the model folder in the current directory. + artifacts and output files). If specified, it overrides the `output_path` property of `estimator`. """ self.estimator = estimator self.job_name = job_name From 347885c6e14e555e2579e1c2d9131afb7ac066b6 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Thu, 17 Jun 2021 16:16:18 -0700 Subject: [PATCH 08/11] Fix unit test --- tests/unit/test_sagemaker_steps.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_sagemaker_steps.py b/tests/unit/test_sagemaker_steps.py index e553d5f..8de23c0 100644 --- a/tests/unit/test_sagemaker_steps.py +++ b/tests/unit/test_sagemaker_steps.py @@ -225,7 +225,7 @@ def test_training_step_creation(pca_estimator): 'TrialName': 'pca_trial', 'TrialComponentDisplayName': 'Training' }, - output_path='s3://sagemaker-us-east-1-111111111111', + output_data_config_path='s3://sagemaker-us-east-1-111111111111', tags=DEFAULT_TAGS, ) assert step.to_dict() == { From 63a7d550330704758ab655781d72f02ebff4132a Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Thu, 17 Jun 2021 17:56:51 -0700 Subject: [PATCH 09/11] Remove try catch so error reflected if s3uri property not found and removed unused imports --- src/stepfunctions/steps/sagemaker.py | 10 ++-------- tests/integ/test_sagemaker_steps.py | 1 - 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index 994f8c7..21a2957 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -118,14 +118,8 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non if data is not None and is_data_placeholder: # Replace the 'S3Uri' key with one that supports JSONpath value. # Support for uri str only: The list will only contain 1 element - try: - data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None) - parameters['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri.$'] = data_uri - except KeyError as error: - logger.warning(f"Placeholder {data} used for data, but could not locate S3Uri property to make " - f"placeholder compatible.\n" - f"Path used:['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri'] - " - f"{error} key was not found.") + data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None) + parameters['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri.$'] = data_uri if hyperparameters is not None: parameters['HyperParameters'] = hyperparameters diff --git a/tests/integ/test_sagemaker_steps.py b/tests/integ/test_sagemaker_steps.py index e926f08..63c060a 100644 --- a/tests/integ/test_sagemaker_steps.py +++ b/tests/integ/test_sagemaker_steps.py @@ -29,7 +29,6 @@ from sagemaker.tuner import HyperparameterTuner from sagemaker.processing import ProcessingInput, ProcessingOutput -from stepfunctions.inputs import ExecutionInput, StepInput from stepfunctions.steps import Chain from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, TuningStep, ProcessingStep from stepfunctions.workflow import Workflow From 69ec517f8c4c55529f15aeaa091353f34e4ed23e Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Fri, 18 Jun 2021 13:13:55 -0700 Subject: [PATCH 10/11] Removed double logger --- src/stepfunctions/steps/sagemaker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index 6de380c..e9c414a 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -30,7 +30,6 @@ logger = logging.getLogger('stepfunctions.sagemaker') SAGEMAKER_SERVICE_NAME = "sagemaker" -logger = logging.getLogger('stepfunctions.sagemaker') class SageMakerApi(Enum): CreateTrainingJob = "createTrainingJob" From 969a795005f5f5ddc93ab167ec267b3990954c5e Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Fri, 18 Jun 2021 13:54:15 -0700 Subject: [PATCH 11/11] Removed unused import --- src/stepfunctions/steps/sagemaker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index e9c414a..30e3d7c 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -15,7 +15,6 @@ import logging from enum import Enum -import logging from stepfunctions.inputs import Placeholder from stepfunctions.steps.states import Task