diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/component_factory.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/component_factory.py index 603c2a4b9f7e..91b8c41152fc 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/component_factory.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/component_factory.py @@ -183,6 +183,11 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com if distribution: distribution = DistributionConfiguration._from_rest_object(distribution) + # Note: we need to refine the logic here if more specific type logic here. + jobs = rest_component_version.component_spec.pop("jobs", None) + if _type == NodeType.PIPELINE and jobs: + jobs = PipelineComponent._resolve_sub_nodes(jobs) + new_instance = create_instance_func() init_kwargs = dict( id=obj.id, @@ -191,6 +196,7 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com inputs=inputs, outputs=outputs, distribution=distribution, + jobs=jobs, **( create_schema_func({BASE_PATH_CONTEXT_KEY: "./"}).load( rest_component_version.component_spec, unknown=INCLUDE diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py index eb53b498c20d..8f2bf4891382 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py @@ -21,7 +21,7 @@ from azure.ai.ml.constants._component import ComponentSource, NodeType from azure.ai.ml.constants._job.pipeline import ValidationErrorCode from azure.ai.ml.entities._builders import BaseNode, Command -from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode +from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode, LoopNode from azure.ai.ml.entities._component.component import Component from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob @@ -323,6 +323,16 @@ def _load_from_rest_pipeline_job(cls, data: Dict): _source=ComponentSource.REMOTE_WORKSPACE_JOB, ) + @classmethod + def _resolve_sub_nodes(cls, rest_jobs): + sub_nodes = {} + for node_name, node in rest_jobs.items(): + if LoopNode._is_loop_node_dict(node): + sub_nodes[node_name] = LoopNode._from_rest_object(node, reference_node_list=sub_nodes) + else: + sub_nodes[node_name] = BaseNode._from_rest_object(node) + return sub_nodes + @classmethod def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: return PipelineComponentSchema(context=context) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py index 7ed174341e83..b6122dd0d267 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py @@ -64,6 +64,7 @@ } +# TODO: Remove this as both rest type and sdk type are snake case now. def get_output_type_mapping_from_rest(): """Get output type mapping.""" return { @@ -255,6 +256,7 @@ def from_rest_inputs_to_dataset_literal( if input_value is None: continue + # TODO: Remove this as both rest type and sdk type are snake case now. type_transfer_dict = get_output_type_mapping_from_rest() # deal with invalid input type submitted by feb api # todo: backend help convert node level input/output type diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py index 166fa4ce1f4f..2265b3ac3673 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py @@ -117,7 +117,7 @@ def from_dict_to_rest_io( rest_obj = rest_object_class.from_dict(val) rest_io_objects[key] = rest_obj else: - msg = "Got unsupported type of output: {}:" + f"{type(val)}" + msg = "Got unsupported type of input/output: {}:" + f"{type(val)}" raise ValidationException( message=msg.format(val), no_personal_data_message=msg.format("[val]"), diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py index 2f94e2d268e1..aaacddff82e3 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py @@ -497,13 +497,7 @@ def _load_from_rest(cls, obj: JobBase) -> "PipelineJob": from_rest_inputs = from_rest_inputs_to_dataset_literal(properties.inputs) or {} from_rest_outputs = from_rest_data_outputs(properties.outputs) or {} # Unpack the component jobs - sub_nodes = {} - if properties.jobs: - for node_name, node in properties.jobs.items(): - if LoopNode._is_loop_node_dict(node): - sub_nodes[node_name] = LoopNode._from_rest_object(node, reference_node_list=sub_nodes) - else: - sub_nodes[node_name] = BaseNode._from_rest_object(node) + sub_nodes = PipelineComponent._resolve_sub_nodes(properties.jobs) if properties.jobs else {} # backend may still store Camel settings, eg: DefaultDatastore, translate them to snake when load back settings_dict = transform_dict_keys(properties.settings, camel_to_snake) if properties.settings else None settings_sdk = PipelineJobSettings(**settings_dict) if settings_dict else PipelineJobSettings() diff --git a/sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py b/sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py index 51eacb1f1c8d..55d6fbd39ff9 100644 --- a/sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py +++ b/sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py @@ -2,6 +2,7 @@ from pathlib import Path import pytest +import yaml from azure.ai.ml import Input, load_component, load_job from azure.ai.ml.entities import PipelineComponent, PipelineJob @@ -416,3 +417,29 @@ def test_invalid_nested_pipeline_component_with_group(self) -> None: "'group' is defined as a parameter group but got input '${{parent.inputs.top_group}}' with type ''" in str(e.value) ) + + def test_simple_jobs_from_rest(self) -> None: + test_path = "./tests/test_configs/components/pipeline_component_jobs_rest_data.json" + with open(test_path, "r") as f: + json_in_file = yaml.safe_load(f) + json_in_file = json_in_file['properties']['component_spec']['jobs'] + jobs = PipelineComponent._resolve_sub_nodes(json_in_file) + node_dict = {key: node._to_rest_object() for key, node in jobs.items()}['component_a_job'] + assert node_dict['computeId'] == '${{parent.inputs.node_compute}}' + assert node_dict['outputs'] == { + 'output_binding': {'type': 'literal', 'value': '${{parent.outputs.output}}'}, + 'output_binding2': {'type': 'literal', 'value': '${{parent.outputs.output}}'}, + 'output_data': {'job_output_type': 'uri_folder', 'mode': 'Upload'}, + 'output_data_legacy': {'job_output_type': 'uri_folder', 'mode': 'Upload'}} + assert node_dict['inputs'] == { + 'binding_input': {'job_input_type': 'literal', 'value': '${{parent.inputs.component_in_path}}'}, + 'data_input': {'job_input_type': 'uri_file', + 'mode': 'Download', + 'uri': 'https://my-blob/path/to/data'}, + 'data_input_legacy': {'job_input_type': 'uri_file', + 'mode': 'Download', + 'uri': 'https://my-blob/path/to/data'}, + 'literal_input': {'job_input_type': 'literal', 'value': '11'}, + 'literal_input2': {'job_input_type': 'literal', 'value': '12'}} + assert node_dict['resources'] == {'instance_count': 1, 'properties': { + 'target_selector': {'my_resource_only': 'false', 'allow_spot_vm': 'true'}}, 'shm_size': '2g'} diff --git a/sdk/ml/azure-ai-ml/tests/internal/e2etests/test_pipeline_job.py b/sdk/ml/azure-ai-ml/tests/internal/e2etests/test_pipeline_job.py index 3d33f7748404..022676d83b40 100644 --- a/sdk/ml/azure-ai-ml/tests/internal/e2etests/test_pipeline_job.py +++ b/sdk/ml/azure-ai-ml/tests/internal/e2etests/test_pipeline_job.py @@ -181,6 +181,7 @@ def pipeline_func(pipeline_input): except HttpResponseError as ex: assert "CancelPipelineRunInTerminalStatus" in str(ex) + # TODO: Enable this when type fixed on master. @pytest.mark.skip(reason="marshmallow.exceptions.ValidationError: miss required jobs.node.component") @pytest.mark.parametrize( "test_case_i,test_case_name", diff --git a/sdk/ml/azure-ai-ml/tests/test_configs/components/pipeline_component_jobs_rest_data.json b/sdk/ml/azure-ai-ml/tests/test_configs/components/pipeline_component_jobs_rest_data.json new file mode 100644 index 000000000000..eb2affdaa400 --- /dev/null +++ b/sdk/ml/azure-ai-ml/tests/test_configs/components/pipeline_component_jobs_rest_data.json @@ -0,0 +1,112 @@ +{ + "id": "mock_id", + "name": "1", + "type": "Microsoft.MachineLearningServices/workspaces/components/versions", + "system_data": { + "created_by": "Brynn Yin", + "created_by_type": "User", + "created_at": "2022-10-25T03:39:27.465966Z", + "last_modified_by": "Brynn Yin", + "last_modified_by_type": "User", + "last_modified_at": "2022-10-25T03:39:28.104555Z" + }, + "properties": { + "properties": {}, + "tags": { + "tag": "tagvalue", + "owner": "sdkteam" + }, + "is_anonymous": false, + "is_archived": false, + "component_spec": { + "name": "test_392226085584", + "version": "1", + "display_name": "Hello World Pipeline Component", + "is_deterministic": "False", + "type": "pipeline", + "description": "This is the basic pipeline component", + "tags": { + "tag": "tagvalue", + "owner": "sdkteam" + }, + "inputs": { + "component_in_path": { + "type": "uri_folder", + "optional": "False", + "description": "A path" + }, + "component_in_number": { + "type": "number", + "optional": "True", + "default": "10.99", + "description": "A number" + }, + "node_compute": { + "type": "string", + "optional": "False", + "default": "cpu-cluster" + } + }, + "jobs": { + "component_a_job": { + "componentId": "mock_id", + "type": "command", + "computeId": "${{parent.inputs.node_compute}}", + "resources": { + "instance_count": "1", + "shm_size": "2g", + "properties": { + "target_selector": { + "my_resource_only": "false", + "allow_spot_vm": "true" + } + } + }, + "inputs": { + "binding_input": { + "job_input_type": "literal", + "value": "${{parent.inputs.component_in_path}}" + }, + "literal_input": { + "job_input_type": "literal", + "value": "11" + }, + "literal_input2": { + "job_input_type": "Literal", + "value": "12" + }, + "data_input": { + "job_input_type": "uri_file", + "mode": "Download", + "uri": "https://my-blob/path/to/data" + }, + "data_input_legacy": { + "job_input_type": "UriFile", + "mode": "Download", + "uri": "https://my-blob/path/to/data" + } + }, + "outputs": { + "output_data": { + "mode": "Upload", + "job_output_type": "uri_folder" + }, + "output_data_legacy": { + "mode": "Upload", + "job_output_type": "UriFolder" + }, + "output_binding": { + "value": "${{parent.outputs.output}}", + "job_output_type": "literal" + }, + "output_binding2": { + "value": "${{parent.outputs.output}}", + "job_output_type": "Literal" + } + } + } + }, + "$schema": "https://azuremlschemas.azureedge.net/development/pipelineComponent.schema.json" + } + } +} \ No newline at end of file