Skip to content

Commit 52d8dce

Browse files
YingChen1996Ying Chen
and
Ying Chen
authored
Support Model in registry in pipeline (#26914)
* update * skip Co-authored-by: Ying Chen <[email protected]>
1 parent 9585ad8 commit 52d8dce

14 files changed

+3735
-9
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from pathlib import Path
1313
from typing import Any, Callable, Dict, TypeVar
1414

15-
from azure.ai.ml.entities import Data, PipelineJob, PipelineJobSettings
15+
from azure.ai.ml.entities import Data, PipelineJob, PipelineJobSettings, Model
1616
from azure.ai.ml.entities._builders.pipeline import Pipeline
1717
from azure.ai.ml.entities._inputs_outputs import Input, is_parameter_group
1818
from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, _GroupAttrDict
@@ -36,6 +36,7 @@
3636
PipelineInput,
3737
NodeOutput,
3838
Input,
39+
Model,
3940
Data, # For the case use a Data object as an input, we will convert it to Input object
4041
Pipeline, # For the case use a pipeline node as the input, we use its only one output as the real input.
4142
str,

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/base_node.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from azure.ai.ml._utils._arm_id_utils import get_resource_name_from_arm_id_safe
1414
from azure.ai.ml.constants import JobType
15-
from azure.ai.ml.entities import Data
15+
from azure.ai.ml.entities import Data, Model
1616
from azure.ai.ml.entities._component.component import Component
1717
from azure.ai.ml.entities._inputs_outputs import Input, Output
1818
from azure.ai.ml.entities._job._input_output_helpers import build_input_output
@@ -189,6 +189,7 @@ def _get_supported_inputs_types(cls):
189189
NodeOutput,
190190
Input,
191191
Data,
192+
Model,
192193
str,
193194
bool,
194195
int,

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/base.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from azure.ai.ml.constants import AssetTypes
1313
from azure.ai.ml.constants._component import IOConstants
1414
from azure.ai.ml.entities._assets._artifacts.data import Data
15+
from azure.ai.ml.entities._assets._artifacts.model import Model
1516
from azure.ai.ml.entities._inputs_outputs import Input, Output
1617
from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpressionMixin
1718
from azure.ai.ml.entities._util import resolve_pipeline_parameter
@@ -241,7 +242,7 @@ def _build_data(self, data, key=None): # pylint: disable=unused-argument
241242
# for data binding case, set is_singular=False for case like "${{parent.inputs.job_in_folder}}/sample1.csv"
242243
if isinstance(data, Input) or is_data_binding_expression(data, is_singular=False):
243244
return data
244-
if isinstance(data, Data):
245+
if isinstance(data, (Data, Model)):
245246
return _data_to_input(data)
246247
# self._meta.type could be None when sub pipeline has no annotation
247248
if isinstance(self._meta, Input) and self._meta.type and not self._meta._is_primitive_type:
@@ -452,7 +453,7 @@ def _build_data(self, data, key=None): # pylint: disable=unused-argument
452453
error_category=ErrorCategory.USER_ERROR,
453454
)
454455
return data
455-
if isinstance(data, Data):
456+
if isinstance(data, (Data, Model)):
456457
# If value is Data, we convert it to an corresponding Input
457458
return _data_to_input(data)
458459
return data

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline_on_registry.py

+109-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import pytest
2+
from pathlib import Path
23
from test_utilities.utils import _PYTEST_TIMEOUT_METHOD
34

4-
from azure.ai.ml import MLClient, load_component
5-
from azure.core.exceptions import HttpResponseError
5+
from azure.ai.ml import MLClient, load_component, Input, load_model
6+
from azure.ai.ml.dsl import pipeline
7+
from azure.ai.ml.constants import AssetTypes
8+
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
9+
from azure.core.polling import LROPoller
610

711
from .._util import _DSL_TIMEOUT_SECOND
812

@@ -12,15 +16,13 @@
1216
@pytest.mark.usefixtures("enable_pipeline_private_preview_features", "recorded_test")
1317
@pytest.mark.timeout(timeout=_DSL_TIMEOUT_SECOND, method=_PYTEST_TIMEOUT_METHOD)
1418
@pytest.mark.e2etest
15-
@pytest.mark.skip(reason="not able to re-record")
1619
@pytest.mark.pipeline_test
1720
class TestDSLPipelineOnRegistry(AzureRecordedTestCase):
21+
@pytest.mark.skip(reason="not able to re-record")
1822
def test_pipeline_job_create_with_registered_component_on_registry(
1923
self,
2024
registry_client: MLClient,
2125
) -> None:
22-
from azure.ai.ml.dsl import pipeline
23-
2426
local_component = load_component("./tests/test_configs/components/basic_component_code_local_path.yml")
2527
try:
2628
created_component = registry_client.components.get(local_component.name, version=local_component.version)
@@ -35,3 +37,105 @@ def sample_pipeline():
3537
pipeline_job = sample_pipeline()
3638
assert registry_client.jobs.validate(pipeline_job).passed
3739
# TODO: add test for pipeline job create with registered component on registry after support is ready on canary
40+
41+
@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
42+
"'Unable to find a record for the request' in playback mode")
43+
def test_pipeline_with_local_component_and_registry_model_as_input(self, registry_client: MLClient, client: MLClient):
44+
# get dataset
45+
test_data = Input(
46+
type=AssetTypes.URI_FILE,
47+
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
48+
)
49+
50+
# load_component
51+
score_func = load_component("./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/score.yml")
52+
53+
pipeline_score_model = Input(
54+
type='mlflow_model',
55+
path='azureml://registries/testFeed/models/iris_model/versions/1'
56+
)
57+
58+
@pipeline()
59+
def score_pipeline_with_registry_model(model_input, test_data):
60+
score = score_func(model_input=model_input, test_data=test_data)
61+
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)
62+
63+
pipeline_job = score_pipeline_with_registry_model(
64+
model_input=pipeline_score_model,
65+
test_data=test_data
66+
)
67+
pipeline_job.settings.default_compute = "cpu-cluster"
68+
pipeline_job = client.jobs.create_or_update(pipeline_job)
69+
cancel_poller = client.jobs.begin_cancel(pipeline_job.name)
70+
assert isinstance(cancel_poller, LROPoller)
71+
72+
@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
73+
"'Unable to find a record for the request' in playback mode")
74+
def test_pipeline_with_local_component_and_registry_model_as_input_with_model_input(
75+
self,
76+
registry_client: MLClient,
77+
client: MLClient):
78+
# get dataset
79+
test_data = Input(
80+
type=AssetTypes.URI_FILE,
81+
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
82+
)
83+
84+
# load_component
85+
score_func = load_component("./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/score.yml")
86+
87+
model_path = Path("./tests/test_configs/model/model_iris.yml")
88+
model_entity = load_model(model_path)
89+
try:
90+
pipeline_score_model = registry_client.models.get(name=model_entity.name, version=model_entity.version)
91+
except ResourceNotFoundError:
92+
model_entity = registry_client.models.create_or_update(model_entity)
93+
pipeline_score_model = registry_client.models.get(name=model_entity.name, version=model_entity.version)
94+
95+
@pipeline()
96+
def score_pipeline_with_registry_model(model_input, test_data):
97+
score = score_func(model_input=model_input, test_data=test_data)
98+
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)
99+
100+
pipeline_job = score_pipeline_with_registry_model(
101+
model_input=pipeline_score_model, test_data=test_data
102+
)
103+
pipeline_job.settings.default_compute = "cpu-cluster"
104+
pipeline_job = client.jobs.create_or_update(pipeline_job)
105+
cancel_poller = client.jobs.begin_cancel(pipeline_job.name)
106+
assert isinstance(cancel_poller, LROPoller)
107+
108+
@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
109+
"'Unable to find a record for the request' in playback mode")
110+
def test_pipeline_with_registry_component_and_model_as_input(self, registry_client: MLClient, client: MLClient):
111+
# get dataset
112+
test_data = Input(
113+
type=AssetTypes.URI_FILE,
114+
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
115+
)
116+
117+
# load_component
118+
score_component_name = "v2_dsl_score_component"
119+
component_version = "0.0.8"
120+
score_func = registry_client.components.get(
121+
name=score_component_name, version=component_version
122+
)
123+
124+
pipeline_score_model = Input(
125+
type='mlflow_model',
126+
path='azureml://registries/testFeed/models/iris_model/versions/1'
127+
)
128+
129+
@pipeline()
130+
def score_pipeline_with_registry_model(model_input, test_data):
131+
score = score_func(model_input=model_input, test_data=test_data)
132+
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)
133+
134+
pipeline_job = score_pipeline_with_registry_model(
135+
model_input=pipeline_score_model,
136+
test_data=test_data
137+
)
138+
pipeline_job.settings.default_compute = "cpu-cluster"
139+
pipeline_job = client.jobs.create_or_update(pipeline_job)
140+
cancel_poller = client.jobs.begin_cancel(pipeline_job.name)
141+
assert isinstance(cancel_poller, LROPoller)

sdk/ml/azure-ai-ml/tests/pipeline_job/e2etests/test_pipeline_job.py

+16
Original file line numberDiff line numberDiff line change
@@ -1543,6 +1543,22 @@ def test_remote_pipeline_component_job(self, client: MLClient, randstr: Callable
15431543
# assert pipeline_dict["outputs"] == {"output_path": {"mode": "ReadWriteMount", "job_output_type": "uri_folder"}}
15441544
assert pipeline_dict["settings"] == {"default_compute": "cpu-cluster", "_source": "REMOTE.WORKSPACE.COMPONENT"}
15451545

1546+
@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
1547+
"'Unable to find a record for the request' in playback mode")
1548+
def test_pipeline_job_create_with_registry_model_as_input(
1549+
self,
1550+
client: MLClient,
1551+
registry_client: MLClient,
1552+
randstr: Callable[[str], str],
1553+
) -> None:
1554+
params_override = [{"name": randstr("name")}]
1555+
pipeline_job = load_job(
1556+
source="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/pipeline.yml",
1557+
params_override=params_override,
1558+
)
1559+
job = client.jobs.create_or_update(pipeline_job)
1560+
assert job.name == params_override[0]["name"]
1561+
15461562
def test_pipeline_node_with_default_component(self, client: MLClient, randstr: Callable[[str], str]):
15471563
params_override = [{"name": randstr("job_name")}]
15481564
pipeline_job = load_job(

0 commit comments

Comments
 (0)