Skip to content

Commit 6e9e5c7

Browse files
authored
[ML][Pipelines]feat: anonymous component reuse (#27008)
* feat: internal anonymous component reuse - basic need to test ignore file & additional includes * fix: resolve comments * fix: fix pylint & ci * test: update internal pipeline job tests recording * fix: fix ci * fix: fix ci * fix: skip some tests in recording mode to pass ci
1 parent 2f06050 commit 6e9e5c7

File tree

56 files changed

+4246
-4290
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+4246
-4290
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/_input_outputs.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ class InternalInput(Input):
2323
- Enum, enum (new)
2424
"""
2525

26-
def __init__(self, datastore_mode=None, **kwargs):
26+
def __init__(self, *, datastore_mode=None, is_resource=None, **kwargs):
2727
self.datastore_mode = datastore_mode
28+
self.is_resource = is_resource
2829
super().__init__(**kwargs)
2930

3031
@property
@@ -90,7 +91,7 @@ def _get_python_builtin_type_str(self) -> str:
9091
return super()._get_python_builtin_type_str()
9192

9293
@classmethod
93-
def _cast_from_input_or_dict(cls, _input: Union[Input, Dict]) -> Optional["InternalInput"]:
94+
def _from_base(cls, _input: Union[Input, Dict]) -> Optional["InternalInput"]:
9495
"""Cast from Input or Dict to InternalInput. Do not guarantee to create a new object."""
9596
if _input is None:
9697
return None
@@ -105,8 +106,12 @@ def _cast_from_input_or_dict(cls, _input: Union[Input, Dict]) -> Optional["Inter
105106

106107

107108
class InternalOutput(Output):
109+
def __init__(self, *, datastore_mode=None, **kwargs):
110+
self.datastore_mode = datastore_mode
111+
super().__init__(**kwargs)
112+
108113
@classmethod
109-
def _cast_from_output_or_dict(cls, _output: Union[Output, Dict]) -> Optional["InternalOutput"]:
114+
def _from_base(cls, _output: Union[Output, Dict]) -> Optional["InternalOutput"]:
110115
if _output is None:
111116
return None
112117
if isinstance(_output, InternalOutput):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
# ---------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# ---------------------------------------------------------
4+
5+
# pylint: disable=pointless-string-statement
6+
7+
import os
8+
import json
9+
import hashlib
10+
from datetime import datetime
11+
from os import listdir
12+
from os.path import isfile, join
13+
from collections import deque
14+
HASH_FILE_CHUNK_SIZE = 65536
15+
HASH_ALGORITHM = "sha512"
16+
17+
''' Copied from ml-components
18+
Create a merkle tree for the given directory path
19+
The directory would typically represent a project directory'''
20+
21+
22+
def create_merkletree(file_or_folder_path, exclude_function):
23+
root = DirTreeNode("", "Directory",
24+
datetime.fromtimestamp(os.path.getmtime(file_or_folder_path)).isoformat())
25+
if os.path.isdir(file_or_folder_path):
26+
folder_path = file_or_folder_path
27+
_create_merkletree_helper(folder_path, root, exclude_function)
28+
else:
29+
file_path = file_or_folder_path
30+
file_node = DirTreeNode(file_path,
31+
"File",
32+
datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat())
33+
hexdigest_hash, bytehash = _get_hash(os.path.normpath(file_path),
34+
file_path,
35+
"File")
36+
if hexdigest_hash and bytehash:
37+
file_node.add_hash(hexdigest_hash, bytehash)
38+
root.add_child(file_node)
39+
40+
_populate_hashes(root)
41+
return root
42+
43+
44+
''' Populate hashes for directory nodes
45+
by hashing the hashes of child nodes under them'''
46+
47+
48+
def _populate_hashes(rootNode):
49+
if rootNode.is_file():
50+
return rootNode.bytehash
51+
h = hashlib.new(HASH_ALGORITHM)
52+
for child in rootNode.children:
53+
if child.is_file():
54+
h.update(child.bytehash)
55+
else:
56+
h.update(_populate_hashes(child))
57+
rootNode.bytehash = h.digest()
58+
rootNode.hexdigest_hash = h.hexdigest()
59+
return h.digest()
60+
61+
62+
''' Create a merkle tree for the given directory path
63+
:param projectDir: Directory for which to create a tree.
64+
:param rootNode: Root node .
65+
Walks the directory and create a dirTree '''
66+
67+
68+
def _create_merkletree_helper(projectDir, rootNode, exclude_function):
69+
for f in sorted(listdir(projectDir)):
70+
path = os.path.normpath(join(projectDir, f))
71+
if not exclude_function(path):
72+
if isfile(join(projectDir, f)):
73+
newNode = DirTreeNode(f, "File", datetime.fromtimestamp(os.path.getmtime(path)).isoformat())
74+
hexdigest_hash, bytehash = _get_hash(path, f, "File")
75+
if hexdigest_hash and bytehash:
76+
newNode.add_hash(hexdigest_hash, bytehash)
77+
rootNode.add_child(newNode)
78+
else:
79+
newNode = DirTreeNode(f, "Directory", datetime.fromtimestamp(os.path.getmtime(path)).isoformat())
80+
rootNode.add_child(newNode)
81+
_create_merkletree_helper(path, newNode, exclude_function)
82+
83+
84+
def _get_hash(filePath, name, file_type):
85+
h = hashlib.new(HASH_ALGORITHM)
86+
if not os.access(filePath, os.R_OK):
87+
print(filePath, os.R_OK)
88+
print("Cannot access file, so excluded from snapshot: {}".format(filePath))
89+
return (None, None)
90+
with open(filePath, 'rb') as f:
91+
while True:
92+
data = f.read(HASH_FILE_CHUNK_SIZE)
93+
if not data:
94+
break
95+
h.update(data)
96+
h.update(name.encode('utf-8'))
97+
h.update(file_type.encode('utf-8'))
98+
return (h.hexdigest(), h.digest())
99+
100+
101+
''' We compute both hexdigest and digest for hashes.
102+
digest (bytes) is used so that we can compute the bytehash of a parent directory based on bytehash of its children
103+
hexdigest is used so that we can serialize the tree using json'''
104+
105+
106+
class DirTreeNode(object):
107+
def __init__(self, name=None, file_type=None, timestamp=None, hexdigest_hash=None, bytehash=None):
108+
self.file_type = file_type
109+
self.name = name
110+
self.timestamp = timestamp
111+
self.children = []
112+
self.hexdigest_hash = hexdigest_hash
113+
self.bytehash = bytehash
114+
115+
def load_children_from_dict(self, node_dict):
116+
if len(node_dict.items()) == 0:
117+
return
118+
self.name = node_dict['name']
119+
self.file_type = node_dict['type']
120+
self.hexdigest_hash = node_dict['hash']
121+
self.timestamp = node_dict['timestamp']
122+
for _, child in node_dict['children'].items():
123+
node = DirTreeNode()
124+
node.load_children_from_dict(child)
125+
self.add_child(node)
126+
return self
127+
128+
def load_children_from_json(self, node_dict):
129+
self.name = node_dict['name']
130+
self.file_type = node_dict['type']
131+
self.hexdigest_hash = node_dict['hash']
132+
self.timestamp = node_dict['timestamp']
133+
for child in node_dict['children']:
134+
node = DirTreeNode()
135+
node.load_children_from_json(child)
136+
self.add_child(node)
137+
return self
138+
139+
def load_object_from_dict(self, node_dict):
140+
self.load_children_from_dict(node_dict)
141+
142+
def load_root_object_from_json_string(self, jsondata):
143+
node_dict = json.loads(jsondata)
144+
self.load_children_from_json(node_dict)
145+
146+
def add_hash(self, hexdigest_hash, bytehash):
147+
self.hexdigest_hash = hexdigest_hash
148+
self.bytehash = bytehash
149+
150+
def add_child(self, node):
151+
self.children.append(node)
152+
153+
def is_file(self):
154+
return self.file_type == "File"
155+
156+
''' Only for debugging purposes'''
157+
def print_tree(self):
158+
queue = deque()
159+
print("Name: " + self.name)
160+
print("Type: " + self.file_type)
161+
for child in self.children:
162+
print(' ' + child.name)
163+
queue.append(child)
164+
for i in queue:
165+
i.print_tree()
166+
167+
168+
''' Serialize merkle tree.
169+
Serialize all fields except digest (bytes)
170+
'''
171+
172+
173+
class DirTreeJsonEncoder(json.JSONEncoder):
174+
def default(self, o):
175+
if not isinstance(o, DirTreeNode):
176+
return super(DirTreeJsonEncoder, self).default(o)
177+
_dict = o.__dict__
178+
_dict.pop("bytehash", None)
179+
_dict['type'] = _dict.pop('file_type')
180+
_dict['hash'] = _dict.pop('hexdigest_hash')
181+
182+
return _dict
183+
184+
185+
class DirTreeJsonEncoderV2(json.JSONEncoder):
186+
def default(self, o):
187+
if not isinstance(o, DirTreeNode):
188+
return super(DirTreeJsonEncoderV2, self).default(o)
189+
_dict = o.__dict__
190+
_dict.pop("bytehash", None)
191+
if 'file_type' in _dict:
192+
_dict['type'] = _dict.pop('file_type')
193+
if 'hexdigest_hash' in _dict:
194+
_dict['hash'] = _dict.pop('hexdigest_hash')
195+
if isinstance(_dict['children'], list):
196+
_dict['children'] = {x.name: x for x in _dict['children']}
197+
198+
return _dict
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# ---------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# ---------------------------------------------------------
4+
5+
from typing import Optional
6+
7+
from ...entities._assets import Code
8+
9+
10+
class InternalCode(Code):
11+
@property
12+
def _upload_hash(self) -> Optional[str]:
13+
# This property will be used to identify the uploaded content when trying to
14+
# upload to datastore. The tracebacks will be as below:
15+
# Traceback (most recent call last):
16+
# _artifact_utilities._check_and_upload_path
17+
# _artifact_utilities._upload_to_datastore
18+
# _artifact_utilities.upload_artifact
19+
# _blob_storage_helper.upload
20+
# where asset id will be calculated based on the upload hash.
21+
22+
if self._is_anonymous is True:
23+
# Name of an anonymous internal code is the same as its snapshot id
24+
# in ml-component, use it as the upload hash to avoid duplicate hash
25+
# calculation with _asset_utils.get_object_hash.
26+
return self.name
27+
28+
return getattr(super(InternalCode, self), "_upload_hash")
29+
30+
def __setattr__(self, key, value):
31+
if key == "name" and hasattr(self, key) and self._is_anonymous is True and value != self.name:
32+
raise AttributeError(
33+
"InternalCode name are calculated based on its content and cannot "
34+
"be changed: current name is {} and new value is {}".format(self.name, value)
35+
)
36+
super().__setattr__(key, value)

sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/component.py

+39-27
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
# pylint: disable=protected-access, redefined-builtin
55
# disable redefined-builtin to use id/type as argument name
66
from contextlib import contextmanager
7+
from os import PathLike
78
from typing import Dict, Union
9+
from uuid import UUID
810

911
from marshmallow import INCLUDE, Schema
1012

@@ -16,13 +18,16 @@
1618
from azure.ai.ml.entities._system_data import SystemData
1719
from azure.ai.ml.entities._util import convert_ordered_dict_to_dict
1820
from azure.ai.ml.entities._validation import MutableValidationResult
21+
from ._merkle_tree import create_merkletree
1922

2023
from ... import Input, Output
24+
from ..._utils._asset_utils import IgnoreFile, get_ignore_file
2125
from .._schema.component import InternalBaseComponentSchema
2226
from ._additional_includes import _AdditionalIncludes
2327
from ._input_outputs import InternalInput, InternalOutput
2428
from .environment import InternalEnvironment
2529
from .node import InternalBaseNode
30+
from .code import InternalCode
2631

2732

2833
class InternalComponent(Component):
@@ -127,40 +132,16 @@ def __init__(
127132
self.ae365exepool = ae365exepool
128133
self.launcher = launcher
129134

130-
# add some internal specific attributes to inputs/outputs after super().__init__()
131-
self._post_process_internal_inputs_outputs(inputs, outputs)
132-
133135
@classmethod
134136
def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool):
135137
component_io = {}
136138
for name, port in io_dict.items():
137139
if is_input:
138-
component_io[name] = InternalInput._cast_from_input_or_dict(port)
140+
component_io[name] = InternalInput._from_base(port)
139141
else:
140-
component_io[name] = InternalOutput._cast_from_output_or_dict(port)
142+
component_io[name] = InternalOutput._from_base(port)
141143
return component_io
142144

143-
def _post_process_internal_inputs_outputs(
144-
self,
145-
inputs_dict: Union[Dict, Input, Output],
146-
outputs_dict: Union[Dict, Input, Output],
147-
):
148-
for io_name, io_object in self.inputs.items():
149-
original = inputs_dict[io_name]
150-
# force append attribute for internal inputs
151-
if isinstance(original, dict):
152-
for attr_name in ["is_resource"]:
153-
if attr_name in original:
154-
io_object.__setattr__(attr_name, original[attr_name])
155-
156-
for io_name, io_object in self.outputs.items():
157-
original = outputs_dict[io_name]
158-
# force append attribute for internal inputs
159-
if isinstance(original, dict):
160-
for attr_name in ["datastore_mode"]:
161-
if attr_name in original:
162-
io_object.__setattr__(attr_name, original[attr_name])
163-
164145
@property
165146
def _additional_includes(self):
166147
if self.__additional_includes is None:
@@ -221,8 +202,27 @@ def _to_rest_object(self) -> ComponentVersionData:
221202
result.name = self.name
222203
return result
223204

205+
@classmethod
206+
def _get_snapshot_id(cls, code_path: Union[str, PathLike]) -> str:
207+
"""Get the snapshot id of a component with specific working directory in ml-components.
208+
Use this as the name of code asset to reuse steps in a pipeline job from ml-components runs.
209+
210+
:param code_path: The path of the working directory.
211+
:type code_path: str
212+
:return: The snapshot id of a component in ml-components with code_path as its working directory.
213+
"""
214+
_ignore_file: IgnoreFile = get_ignore_file(code_path)
215+
curr_root = create_merkletree(code_path, lambda x: _ignore_file.is_file_excluded(code_path))
216+
snapshot_id = str(UUID(curr_root.hexdigest_hash[::4]))
217+
return snapshot_id
218+
224219
@contextmanager
225220
def _resolve_local_code(self):
221+
"""Create a Code object pointing to local code and yield it."""
222+
# Note that if self.code is already a Code object, this function won't be called
223+
# in create_or_update => _try_resolve_code_for_component, which is also
224+
# forbidden by schema CodeFields for now.
225+
226226
self._additional_includes.resolve()
227227

228228
# file dependency in code will be read during internal environment resolution
@@ -232,7 +232,19 @@ def _resolve_local_code(self):
232232
if isinstance(self.environment, InternalEnvironment):
233233
self.environment.resolve(self._additional_includes.code)
234234
# use absolute path in case temp folder & work dir are in different drive
235-
yield self._additional_includes.code.absolute()
235+
tmp_code_dir = self._additional_includes.code.absolute()
236+
# Use the snapshot id in ml-components as code name to enable anonymous
237+
# component reuse from ml-component runs.
238+
# calculate snapshot id here instead of inside InternalCode to ensure that
239+
# snapshot id is calculated based on the resolved code path
240+
yield InternalCode(
241+
name=self._get_snapshot_id(tmp_code_dir),
242+
version="1",
243+
base_path=self._base_path,
244+
path=tmp_code_dir,
245+
is_anonymous=True,
246+
)
247+
236248
self._additional_includes.cleanup()
237249

238250
def __call__(self, *args, **kwargs) -> InternalBaseNode: # pylint: disable=useless-super-delegation

0 commit comments

Comments
 (0)