Skip to content

feat: Adds support for Placeholders in TrainingStep to set S3 location for InputDataConfig and OutputDataConfig #142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions src/stepfunctions/steps/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from __future__ import absolute_import

from enum import Enum
import logging
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be removed in next commit


from stepfunctions.inputs import Placeholder
from stepfunctions.steps.states import Task
from stepfunctions.steps.fields import Field
Expand All @@ -24,7 +26,7 @@
from sagemaker.model_monitor import DataCaptureConfig

SAGEMAKER_SERVICE_NAME = "sagemaker"

logger = logging.getLogger('stepfunctions.sagemaker')

class SageMakerApi(Enum):
CreateTrainingJob = "createTrainingJob"
Expand All @@ -43,7 +45,7 @@ class TrainingStep(Task):
Creates a Task State to execute a `SageMaker Training Job <https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTrainingJob.html>`_. The TrainingStep will also create a model by default, and the model shares the same name as the training job.
"""

def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=None, mini_batch_size=None, experiment_config=None, wait_for_completion=True, tags=None, output_path=None, **kwargs):
def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=None, mini_batch_size=None, experiment_config=None, wait_for_completion=True, tags=None, output_data_config_path=None, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
Expand All @@ -69,8 +71,9 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non
experiment_config (dict, optional): Specify the experiment config for the training. (Default: None)
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait for the training job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the training job and proceed to the next step. (default: True)
tags (list[dict], optional): `List to tags <https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html>`_ to associate with the resource.
output_path (str or Placeholder, optional): S3 location for saving the training result (model
artifacts and output files) to propagate to estimator. If not specified, results are
output_data_config_path (str or Placeholder, optional): S3 location for saving the training result (model
artifacts and output files) to propagate to estimator (maps to sagemaker.estimator.EstimatorBase
output_path parameter.) If not specified, results are
stored to a default bucket. If the bucket with the specific name
does not exist, the estimator creates the bucket during the
:meth:`~sagemaker.estimator.EstimatorBase.fit` method execution.
Expand All @@ -95,8 +98,8 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non

kwargs[Field.Resource.value] = get_service_integration_arn(SAGEMAKER_SERVICE_NAME,
SageMakerApi.CreateTrainingJob)
# sagemaker.workflow.airflow.training_config does not accept Placeholder as input. Transform data placeholder
# to JSONpath to generate parameters.
# Convert `data` Placeholder to a JSONPath string because sagemaker.workflow.airflow.training_config does not
# accept Placeholder in the `input` argument. We will suffix the 'S3Uri' key in `parameters` with ".$" later.
is_data_placeholder = isinstance(data, Placeholder)
if is_data_placeholder:
data = data.to_jsonpath()
Expand All @@ -115,14 +118,20 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non
if isinstance(job_name, Placeholder):
parameters['TrainingJobName'] = job_name

if output_path is not None:
parameters['OutputDataConfig']['S3OutputPath'] = output_path
if output_data_config_path is not None:
parameters['OutputDataConfig']['S3OutputPath'] = output_data_config_path

if data is not None and is_data_placeholder:
# Replace the 'S3Uri' key with one that supports JSONpath value.
# Support for uri str only: The list will only contain 1 element
temp_data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None)
parameters['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri.$'] = temp_data_uri
try:
data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None)
parameters['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri.$'] = data_uri
except KeyError as error:
logger.warning(f"Placeholder {data} used for data, but could not locate S3Uri property to make "
f"placeholder compatible.\n"
f"Path used:['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri'] - "
f"{error} key was not found.")

if hyperparameters is not None:
parameters['HyperParameters'] = hyperparameters
Expand Down
53 changes: 1 addition & 52 deletions tests/integ/test_sagemaker_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from sagemaker.tuner import HyperparameterTuner
from sagemaker.processing import ProcessingInput, ProcessingOutput

from stepfunctions.inputs import ExecutionInput
from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.steps import Chain
from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, TuningStep, ProcessingStep
from stepfunctions.workflow import Workflow
Expand Down Expand Up @@ -106,57 +106,6 @@ def test_training_step(pca_estimator_fixture, record_set_fixture, sfn_client, sf
# End of Cleanup


# TODO: Add integ test with StepInput
def test_training_step_with_placeholders(pca_estimator_fixture,
record_set_fixture,
sfn_client,
sfn_role_arn,
sagemaker_session):
# Build workflow definition
execution_input = ExecutionInput(schema={
'JobName': str,
'OutputPath': str,
'Data': str
})

job_name_placeholder = execution_input['JobName']
output_path_placeholder = execution_input['OutputPath']
training_step_name = 'TrainingStep'

training_step = TrainingStep(training_step_name,
estimator=pca_estimator_fixture,
job_name=job_name_placeholder,
data=record_set_fixture,
mini_batch_size=200,
output_path=output_path_placeholder)
workflow_graph = Chain([training_step])

with timeout(minutes=DEFAULT_TIMEOUT_MINUTES):
# Create workflow and check definition
workflow = create_workflow_and_check_definition(
workflow_graph=workflow_graph,
workflow_name=unique_name_from_base("integ-test-training-step-workflow"),
sfn_client=sfn_client,
sfn_role_arn=sfn_role_arn
)

# Execute workflow
job_id = generate_job_name()
execution_input = {
'OutputPath': f's3://{sagemaker_session.default_bucket()}/',
'JobName': f'TrainingJob-{job_id}'
}

# Check workflow output
execution = workflow.execute(inputs=execution_input)
execution_output = execution.get_output(wait=True)
assert execution_output.get("TrainingJobStatus") == "Completed"

# Cleanup
state_machine_delete_wait(sfn_client, workflow.state_machine_arn)
# End of Cleanup


def test_model_step(trained_estimator, sfn_client, sagemaker_session, sfn_role_arn):
# Build workflow definition
model_name = generate_job_name()
Expand Down
13 changes: 8 additions & 5 deletions tests/unit/test_sagemaker_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sagemaker.processing import ProcessingInput, ProcessingOutput

from unittest.mock import MagicMock, patch
from stepfunctions.inputs import ExecutionInput
from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, ProcessingStep
from stepfunctions.steps.sagemaker import tuning_config

Expand Down Expand Up @@ -272,15 +272,18 @@ def test_training_step_creation(pca_estimator):
def test_training_step_creation_with_placeholders(pca_estimator):
execution_input = ExecutionInput(schema={
'Data': str,
'JobName': str,
'OutputPath': str,
})

step_input = StepInput(schema={
'JobName': str,
})

step = TrainingStep('Training',
estimator=pca_estimator,
job_name= execution_input['JobName'],
job_name=step_input['JobName'],
data=execution_input['Data'],
output_path=execution_input['OutputPath'],
output_data_config_path=execution_input['OutputPath'],
experiment_config={
'ExperimentName': 'pca_experiment',
'TrialName': 'pca_trial',
Expand Down Expand Up @@ -331,7 +334,7 @@ def test_training_step_creation_with_placeholders(pca_estimator):
'TrialName': 'pca_trial',
'TrialComponentDisplayName': 'Training'
},
'TrainingJobName.$': "$$.Execution.Input['JobName']",
'TrainingJobName.$': "$['JobName']",
'Tags': DEFAULT_TAGS_LIST
},
'Resource': 'arn:aws:states:::sagemaker:createTrainingJob.sync',
Expand Down