Skip to content

[ML][Pipeline] Fix primitive pipeline component output #28808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"String",
"float",
"Float",
"double",
"Double"
]


Expand Down
30 changes: 30 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_internal/_utils/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# hack: map internal component output type to valid v2 output type
from azure.ai.ml._internal._schema.input_output import SUPPORTED_INTERNAL_PARAM_TYPES
from azure.ai.ml._utils.utils import get_all_enum_values_iter
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.constants._common import InputTypes


def _map_internal_output_type(_meta):
"""Map component output type to valid pipeline output type."""
def _map_primitive_type(_type):
"""Convert double and float to number type."""
_type = _type.lower()
if _type in ["double", "float"]:
return InputTypes.NUMBER
return _type

if type(_meta).__name__ != "InternalOutput":
return _meta.type
if _meta.type in list(get_all_enum_values_iter(AssetTypes)):
return _meta.type
if _meta.type in SUPPORTED_INTERNAL_PARAM_TYPES:
return _map_primitive_type(_meta.type)
if _meta.type in ["AnyFile"]:
return AssetTypes.URI_FILE
# Handle AnyDirectory and the other types.
return AssetTypes.URI_FOLDER
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
from inspect import Parameter, signature
from typing import Callable, Union

from azure.ai.ml._internal._utils._utils import _map_internal_output_type
from azure.ai.ml._utils._func_utils import get_outputs_and_locals
from azure.ai.ml._utils.utils import (
get_all_enum_values_iter,
is_valid_node_name,
parse_args_description_from_docstring,
)
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.constants._component import ComponentSource, IOConstants
from azure.ai.ml.constants._job.pipeline import COMPONENT_IO_KEYWORDS
from azure.ai.ml.dsl._utils import _sanitize_python_variable_name
Expand Down Expand Up @@ -259,19 +258,10 @@ def _build_pipeline_outputs(self, outputs: typing.Dict[str, NodeOutput]):
is_control=value.is_control,
)

# hack: map component output type to valid pipeline output type
def _map_type(_meta):
if type(_meta).__name__ != "InternalOutput":
return _meta.type
if _meta.type in list(get_all_enum_values_iter(AssetTypes)):
return _meta.type
if _meta.type in ["AnyFile"]:
return AssetTypes.URI_FILE
return AssetTypes.URI_FOLDER

# Note: Here we set PipelineOutput as Pipeline's output definition as we need output binding.
output_meta = Output(
type=_map_type(meta), description=meta.description, mode=meta.mode, is_control=meta.is_control
type=_map_internal_output_type(meta), description=meta.description,
mode=meta.mode, is_control=meta.is_control
)
pipeline_output = PipelineOutput(
port_name=key,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pytest
from azure.ai.ml.dsl._group_decorator import group
from devtools_testutils import AzureRecordedTestCase, is_live
from test_utilities.utils import _PYTEST_TIMEOUT_METHOD, assert_job_cancel, omit_with_wildcard

from azure.ai.ml import Input, MLClient, load_component
from azure.ai.ml import Input, MLClient, load_component, Output
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.dsl._condition import condition
from azure.ai.ml.dsl._do_while import do_while
Expand Down Expand Up @@ -171,12 +172,36 @@ def test_registered_component_is_control(self, client: MLClient):
registered_component = client.components.create_or_update(primitive_component_with_normal_input_output_v2)
rest_dict = registered_component._to_dict()
# Assert is_control with correct bool type
assert rest_dict["outputs"] == {
expected_dict = {
"output_data": {"type": "uri_folder"},
"bool_param_output": {"type": "boolean", "is_control": True, "early_available": True},
"int_param_output": {"type": "integer", "is_control": True},
"float_param_output": {"type": "number", "is_control": True},
"str_param_output": {"type": "string", "is_control": True}}
assert rest_dict["outputs"] == expected_dict

# Assert on pipeline component
@group
class ControlOutputGroup:
output_data: Output(type="uri_folder")
float_param_output: Output(type="number", is_control=True)
int_param_output: Output(type="integer", is_control=True)
bool_param_output: Output(type="boolean", is_control=True)
str_param_output: Output(type="string", is_control=True)

@pipeline()
def test_pipeline_component_control_output() -> ControlOutputGroup:
node = primitive_component_with_normal_input_output_v2(
input_data=test_input, parambool=True,
paramint=2, paramfloat=2.2, paramstr="test"
)
return node.outputs

registered_pipeline_component = client.components.create_or_update(test_pipeline_component_control_output)
rest_dict = registered_pipeline_component._to_dict()
# Update expected dict, early_available will be removed for subgraph output.
expected_dict["bool_param_output"] = {"type": "boolean", "is_control": True}
assert rest_dict["outputs"] == expected_dict

def test_do_while_combined_if_else(self, client: MLClient):
do_while_body_component = load_component(
Expand Down
Loading