Skip to content

add placeholders support for all properties  #117

Closed
@joschaharpeng1337

Description

@joschaharpeng1337

First: Thanks for this api. Its really a handy way to create workflows.

Problem

We tried use an execution input for the image_uri in a Processor. The workflow ended up using the execution_input variable as a String for image_uri. It did not replace it.

In general we want to set a lot of properties via a configuration, without changing the workflow.

Here is what we did

...
execution_input = ExecutionInput()
groot_processor = Processor(
 ...
    image_uri=execution_input['image_uri']"
 ...

)
groot_step = ProcessingStep('groot', ...)
chain = Chain([groot_step])
workflow = Workflow(..., definition=chain,..)
state_machine_arn = workflow.create()

config = {
   "image_uri": "1234567890.dkr.ecr.eu-central-1.amazonaws.com/solrizer:latest"
}

import boto3
SFN_CLIENT = boto3.client('stepfunctions', region_name=boto3.Session().region_name)
SFN_CLIENT.start_execution(
        stateMachineArn=state_machine_arn,
        name='test-pipeline',
        input=json.dumps(config)
    )
...

Sadly the workflow did not replace execution_input['image_uri'] with the actual value.

This is our workaround

sf_client = boto_session.client('stepfunctions')
execution_input = ExecutionInput()
groot_processor = Processor(
 ...
    image_uri="$$.Execution.Input['image_uri']",
...
)
groot_step = ProcessingStep('groot', ...)
chain = Chain([groot_step])
workflow = Workflow(..., definition=chain,..)
workflow_arn = workflow.create()

response = sf_client.describe_state_machine(
    stateMachineArn=workflow_arn
)

original_workflow = json.loads(response['definition'])
for step_name in original_workflow["States"]:
    try:
        original_workflow["States"][step_name]["Parameters"]["AppSpecification"]["ImageUri.$"] = \
        original_workflow["States"][step_name]["Parameters"]["AppSpecification"].pop("ImageUri")
    except KeyError:
        continue

new_workflow = json.dumps(original_workflow, indent=2)

sf_client.update_state_machine(
    stateMachineArn=workflow_arn,
    definition=new_workflow,
    roleArn=workflow_role
)

used dependencies:

sagemaker==2.19.0
stepfunctions==2.0.0rc1

Expectation

  • We would like to be able to use execution inputs for all processors and steps. The workaround is not that complicated and we do it for some values right now, but it clutters up the code.
  • We did not find documentation which processor properties can handle placeholers. We would really appreciate a documentation for that. Right now the stepfunctions documentation only refers to the regular sagemaker documentation.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions