Skip to content

[ML][Pipelines] Separate jobs from schema load #27019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com
if distribution:
distribution = DistributionConfiguration._from_rest_object(distribution)

# Note: we need to refine the logic here if more specific type logic here.
jobs = rest_component_version.component_spec.pop("jobs", None)
if _type == NodeType.PIPELINE and jobs:
jobs = PipelineComponent._resolve_sub_nodes(jobs)

new_instance = create_instance_func()
init_kwargs = dict(
id=obj.id,
Expand All @@ -191,6 +196,7 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com
inputs=inputs,
outputs=outputs,
distribution=distribution,
jobs=jobs,
**(
create_schema_func({BASE_PATH_CONTEXT_KEY: "./"}).load(
rest_component_version.component_spec, unknown=INCLUDE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from azure.ai.ml.constants._component import ComponentSource, NodeType
from azure.ai.ml.constants._job.pipeline import ValidationErrorCode
from azure.ai.ml.entities._builders import BaseNode, Command
from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode, LoopNode
from azure.ai.ml.entities._component.component import Component
from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output
from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
Expand Down Expand Up @@ -323,6 +323,16 @@ def _load_from_rest_pipeline_job(cls, data: Dict):
_source=ComponentSource.REMOTE_WORKSPACE_JOB,
)

@classmethod
def _resolve_sub_nodes(cls, rest_jobs):
sub_nodes = {}
for node_name, node in rest_jobs.items():
if LoopNode._is_loop_node_dict(node):
sub_nodes[node_name] = LoopNode._from_rest_object(node, reference_node_list=sub_nodes)
else:
sub_nodes[node_name] = BaseNode._from_rest_object(node)
return sub_nodes

@classmethod
def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
return PipelineComponentSchema(context=context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
}


# TODO: Remove this as both rest type and sdk type are snake case now.
def get_output_type_mapping_from_rest():
"""Get output type mapping."""
return {
Expand Down Expand Up @@ -255,6 +256,7 @@ def from_rest_inputs_to_dataset_literal(
if input_value is None:
continue

# TODO: Remove this as both rest type and sdk type are snake case now.
type_transfer_dict = get_output_type_mapping_from_rest()
# deal with invalid input type submitted by feb api
# todo: backend help convert node level input/output type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def from_dict_to_rest_io(
rest_obj = rest_object_class.from_dict(val)
rest_io_objects[key] = rest_obj
else:
msg = "Got unsupported type of output: {}:" + f"{type(val)}"
msg = "Got unsupported type of input/output: {}:" + f"{type(val)}"
raise ValidationException(
message=msg.format(val),
no_personal_data_message=msg.format("[val]"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,7 @@ def _load_from_rest(cls, obj: JobBase) -> "PipelineJob":
from_rest_inputs = from_rest_inputs_to_dataset_literal(properties.inputs) or {}
from_rest_outputs = from_rest_data_outputs(properties.outputs) or {}
# Unpack the component jobs
sub_nodes = {}
if properties.jobs:
for node_name, node in properties.jobs.items():
if LoopNode._is_loop_node_dict(node):
sub_nodes[node_name] = LoopNode._from_rest_object(node, reference_node_list=sub_nodes)
else:
sub_nodes[node_name] = BaseNode._from_rest_object(node)
sub_nodes = PipelineComponent._resolve_sub_nodes(properties.jobs) if properties.jobs else {}
# backend may still store Camel settings, eg: DefaultDatastore, translate them to snake when load back
settings_dict = transform_dict_keys(properties.settings, camel_to_snake) if properties.settings else None
settings_sdk = PipelineJobSettings(**settings_dict) if settings_dict else PipelineJobSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path

import pytest
import yaml

from azure.ai.ml import Input, load_component, load_job
from azure.ai.ml.entities import PipelineComponent, PipelineJob
Expand Down Expand Up @@ -416,3 +417,29 @@ def test_invalid_nested_pipeline_component_with_group(self) -> None:
"'group' is defined as a parameter group but got input '${{parent.inputs.top_group}}' with type '<class 'str'>'"
in str(e.value)
)

def test_simple_jobs_from_rest(self) -> None:
test_path = "./tests/test_configs/components/pipeline_component_jobs_rest_data.json"
with open(test_path, "r") as f:
json_in_file = yaml.safe_load(f)
json_in_file = json_in_file['properties']['component_spec']['jobs']
jobs = PipelineComponent._resolve_sub_nodes(json_in_file)
node_dict = {key: node._to_rest_object() for key, node in jobs.items()}['component_a_job']
assert node_dict['computeId'] == '${{parent.inputs.node_compute}}'
assert node_dict['outputs'] == {
'output_binding': {'type': 'literal', 'value': '${{parent.outputs.output}}'},
'output_binding2': {'type': 'literal', 'value': '${{parent.outputs.output}}'},
'output_data': {'job_output_type': 'uri_folder', 'mode': 'Upload'},
'output_data_legacy': {'job_output_type': 'uri_folder', 'mode': 'Upload'}}
assert node_dict['inputs'] == {
'binding_input': {'job_input_type': 'literal', 'value': '${{parent.inputs.component_in_path}}'},
'data_input': {'job_input_type': 'uri_file',
'mode': 'Download',
'uri': 'https://my-blob/path/to/data'},
'data_input_legacy': {'job_input_type': 'uri_file',
'mode': 'Download',
'uri': 'https://my-blob/path/to/data'},
'literal_input': {'job_input_type': 'literal', 'value': '11'},
'literal_input2': {'job_input_type': 'literal', 'value': '12'}}
assert node_dict['resources'] == {'instance_count': 1, 'properties': {
'target_selector': {'my_resource_only': 'false', 'allow_spot_vm': 'true'}}, 'shm_size': '2g'}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def pipeline_func(pipeline_input):
except HttpResponseError as ex:
assert "CancelPipelineRunInTerminalStatus" in str(ex)

# TODO: Enable this when type fixed on master.
@pytest.mark.skip(reason="marshmallow.exceptions.ValidationError: miss required jobs.node.component")
@pytest.mark.parametrize(
"test_case_i,test_case_name",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
{
"id": "mock_id",
"name": "1",
"type": "Microsoft.MachineLearningServices/workspaces/components/versions",
"system_data": {
"created_by": "Brynn Yin",
"created_by_type": "User",
"created_at": "2022-10-25T03:39:27.465966Z",
"last_modified_by": "Brynn Yin",
"last_modified_by_type": "User",
"last_modified_at": "2022-10-25T03:39:28.104555Z"
},
"properties": {
"properties": {},
"tags": {
"tag": "tagvalue",
"owner": "sdkteam"
},
"is_anonymous": false,
"is_archived": false,
"component_spec": {
"name": "test_392226085584",
"version": "1",
"display_name": "Hello World Pipeline Component",
"is_deterministic": "False",
"type": "pipeline",
"description": "This is the basic pipeline component",
"tags": {
"tag": "tagvalue",
"owner": "sdkteam"
},
"inputs": {
"component_in_path": {
"type": "uri_folder",
"optional": "False",
"description": "A path"
},
"component_in_number": {
"type": "number",
"optional": "True",
"default": "10.99",
"description": "A number"
},
"node_compute": {
"type": "string",
"optional": "False",
"default": "cpu-cluster"
}
},
"jobs": {
"component_a_job": {
"componentId": "mock_id",
"type": "command",
"computeId": "${{parent.inputs.node_compute}}",
"resources": {
"instance_count": "1",
"shm_size": "2g",
"properties": {
"target_selector": {
"my_resource_only": "false",
"allow_spot_vm": "true"
}
}
},
"inputs": {
"binding_input": {
"job_input_type": "literal",
"value": "${{parent.inputs.component_in_path}}"
},
"literal_input": {
"job_input_type": "literal",
"value": "11"
},
"literal_input2": {
"job_input_type": "Literal",
"value": "12"
},
"data_input": {
"job_input_type": "uri_file",
"mode": "Download",
"uri": "https://my-blob/path/to/data"
},
"data_input_legacy": {
"job_input_type": "UriFile",
"mode": "Download",
"uri": "https://my-blob/path/to/data"
}
},
"outputs": {
"output_data": {
"mode": "Upload",
"job_output_type": "uri_folder"
},
"output_data_legacy": {
"mode": "Upload",
"job_output_type": "UriFolder"
},
"output_binding": {
"value": "${{parent.outputs.output}}",
"job_output_type": "literal"
},
"output_binding2": {
"value": "${{parent.outputs.output}}",
"job_output_type": "Literal"
}
}
}
},
"$schema": "https://azuremlschemas.azureedge.net/development/pipelineComponent.schema.json"
}
}
}