Skip to content

feat: support setting path for node output directly #26712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ def path(self) -> str:

@path.setter
def path(self, path):
# For un-configured input/output, we build a default data entry for them.
self._build_default_data()
if hasattr(self._data, "path"):
self._data.path = path
else:
Expand Down Expand Up @@ -361,7 +363,9 @@ def is_control(self) -> str:
def _build_default_data(self):
"""Build default data when output not configured."""
if self._data is None:
self._data = Output()
# _meta will be None when node._component is not a Component object
# so we just leave the type inference work to backend
self._data = Output(type=None)

def _build_data(self, data, key=None):
"""Build output data according to assigned input, eg: node.outputs.key = data"""
Expand Down Expand Up @@ -593,15 +597,13 @@ def _validate_inputs(cls, inputs):

def __getattr__(self, name: K) -> V:
if name not in self:
# pylint: disable=unnecessary-comprehension
raise UnexpectedAttributeError(keyword=name, keywords=[key for key in self])
raise UnexpectedAttributeError(keyword=name, keywords=list(self))
return super().__getitem__(name)

def __getitem__(self, item: K) -> V:
# We raise this exception instead of KeyError
if item not in self:
# pylint: disable=unnecessary-comprehension
raise UnexpectedKeywordError(func_name="ParameterGroup", keyword=item, keywords=[key for key in self])
raise UnexpectedKeywordError(func_name="ParameterGroup", keyword=item, keywords=list(self))
return super().__getitem__(item)

# For Jupyter Notebook auto-completion
Expand Down Expand Up @@ -645,6 +647,13 @@ def __init__(self, outputs: dict, **kwargs):
def __getattr__(self, item) -> NodeOutput:
return self.__getitem__(item)

def __getitem__(self, item) -> NodeOutput:
if item not in self:
# We raise this exception instead of KeyError as OutputsAttrDict doesn't support add new item after
# __init__.
raise UnexpectedAttributeError(keyword=item, keywords=list(self))
return super().__getitem__(item)

def __setattr__(self, key: str, value: Union[Data, Output]):
if isinstance(value, Output):
mode = value.mode
Expand Down
21 changes: 17 additions & 4 deletions sdk/ml/azure-ai-ml/tests/dsl/unittests/test_component_func.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from pathlib import Path
from typing import Callable, Union

import marshmallow
import pytest
from marshmallow import ValidationError

from azure.ai.ml import PyTorchDistribution, load_component
from azure.ai.ml.entities import Component as ComponentEntity
from azure.ai.ml.entities import Data, JobResourceConfiguration
from azure.ai.ml.entities._builders import Command
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._job.pipeline._io import PipelineInput, PipelineOutput
from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function
from azure.ai.ml.exceptions import UnexpectedKeywordError, ValidationException
from azure.ai.ml.exceptions import UnexpectedKeywordError, ValidationException, UnexpectedAttributeError

from .._util import _DSL_TIMEOUT_SECOND

Expand Down Expand Up @@ -156,7 +154,22 @@ def test_component_outputs(self):

# configure mode and default Output is built
component.outputs.component_out_path.mode = "upload"
assert component._build_outputs() == {"component_out_path": Output(mode="upload")}
assert component._build_outputs() == {"component_out_path": Output(type=None, mode="upload")}

test_output_path = "azureml://datastores/workspaceblobstore/paths/azureml/ps_copy_component/outputs/output_dir"
component: Command = component_func()

# configure path and default Output is built
component.outputs.component_out_path.path = test_output_path
assert component._build_outputs() == {"component_out_path": Output(type=None, path=test_output_path)}

# non-existent output
with pytest.raises(
UnexpectedAttributeError,
match="Got an unexpected attribute 'component_out_path_non', "
"valid attributes: 'component_out_path'."
):
component.outputs["component_out_path_non"].path = test_output_path

# configure data
component: Command = component_func()
Expand Down
5 changes: 3 additions & 2 deletions sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import os
from functools import partial
from io import StringIO
Expand Down Expand Up @@ -585,7 +584,7 @@ def pipeline(
}

job_yaml = "./tests/test_configs/pipeline_jobs/helloworld_pipeline_job_data_options.yml"
pipeline_job = load_job(source=job_yaml)
pipeline_job: PipelineJob = load_job(source=job_yaml)

pipeline = pipeline(**{key: val for key, val in pipeline_job._build_inputs().items()})
pipeline.inputs.job_in_data_by_store_path_and_mount.mode = "ro_mount"
Expand All @@ -605,6 +604,8 @@ def pipeline(
actual_outputs = pipeline._build_outputs()
for k, v in actual_outputs.items():
v.mode = v.mode.lower()
# outputs defined in yaml are all uri_folder, while its default value in dsl is None
v.type = "uri_folder"
assert pipeline_job._build_outputs() == actual_outputs

component_job = next(iter(pipeline_job.jobs.values()))._to_rest_object()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,16 @@ def pipeline_func():
if key.startswith("data_"):
expected_inputs[key] = {"job_input_type": "mltable", "uri": "azureml:scope_tsv:1"}
assert rest_obj.properties.jobs["node"]["inputs"] == expected_inputs

def test_pipeline_with_setting_node_output_directly(self) -> None:
component_dir = Path(__file__).parent.parent.parent / "test_configs" / "internal" / "command-component"
copy_func = load_component(component_dir / "command-linux/copy/component.yaml")

copy_file = copy_func(
input_dir=None,
file_names=None,
)

copy_file.outputs.output_dir.path = "path_on_datastore"
assert copy_file.outputs.output_dir.path == "path_on_datastore"
assert copy_file.outputs.output_dir.type == "path"
Original file line number Diff line number Diff line change
Expand Up @@ -1454,3 +1454,17 @@ def test_comment_in_pipeline(self) -> None:
rest_pipeline_dict = pipeline_job._to_rest_object().as_dict()["properties"]
assert pipeline_dict["jobs"]["hello_world_component"]["comment"] == "arbitrary string"
assert rest_pipeline_dict["jobs"]["hello_world_component"]["comment"] == "arbitrary string"

def test_pipeline_node_default_output(self):
test_path = "./tests/test_configs/pipeline_jobs/helloworld_pipeline_job_with_component_output.yml"
pipeline: PipelineJob = load_job(source=test_path)

# pipeline level output
pipeline_output = pipeline.outputs["job_out_path_2"]
assert pipeline_output.mode == "upload"

# other node level output tests can be found in
# dsl/unittests/test_component_func.py::TestComponentFunc::test_component_outputs
# data-binding-expression
with pytest.raises(ValidationException, match="<class '.*'> does not support setting path."):
pipeline.jobs["merge_component_outputs"].outputs["component_out_path_1"].path = "xxx"