diff --git a/.vscode/cspell.json b/.vscode/cspell.json index f1e127fcbc04..c604c50b847f 100644 --- a/.vscode/cspell.json +++ b/.vscode/cspell.json @@ -1323,12 +1323,20 @@ { "filename": "sdk/ai/azure-ai-inference/**", "words": [ - "ubinary", - "mros", - "Nify", "ctxt", - "wday", - "dtype" + "dels", + "dtype", + "fmatter", + "fspath", + "fstring", + "ldel", + "mros", + "nify", + "okwargs", + "prompty", + "rdel", + "ubinary", + "wday" ] }, { diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/_model_base.py b/sdk/ai/azure-ai-inference/azure/ai/inference/_model_base.py index c4b1008c1e85..53305e2213a7 100644 --- a/sdk/ai/azure-ai-inference/azure/ai/inference/_model_base.py +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/_model_base.py @@ -674,7 +674,7 @@ def _get_deserialize_callable_from_annotation( # pylint: disable=R0911, R0915, except AttributeError: model_name = annotation if module is not None: - annotation = _get_model(module, model_name) + annotation = _get_model(module, model_name) # type: ignore try: if module and _is_model(annotation): diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/__init__.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/__init__.py new file mode 100644 index 000000000000..2e11b31cb6a4 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/__init__.py @@ -0,0 +1,8 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# pylint: disable=unused-import +from ._patch import patch_sdk as _patch_sdk, PromptTemplate + +_patch_sdk() diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_core.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_core.py new file mode 100644 index 000000000000..ec6702995149 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_core.py @@ -0,0 +1,312 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# mypy: disable-error-code="assignment,attr-defined,index,arg-type" +# pylint: disable=line-too-long,R,consider-iterating-dictionary,raise-missing-from,dangerous-default-value +from __future__ import annotations +import os +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Any, AsyncIterator, Dict, Iterator, List, Literal, Union +from ._tracer import Tracer, to_dict +from ._utils import load_json + + +@dataclass +class ToolCall: + id: str + name: str + arguments: str + + +@dataclass +class PropertySettings: + """PropertySettings class to define the properties of the model + + Attributes + ---------- + type : str + The type of the property + default : Any + The default value of the property + description : str + The description of the property + """ + + type: Literal["string", "number", "array", "object", "boolean"] + default: Union[str, int, float, List, Dict, bool, None] = field(default=None) + description: str = field(default="") + + +@dataclass +class ModelSettings: + """ModelSettings class to define the model of the prompty + + Attributes + ---------- + api : str + The api of the model + configuration : Dict + The configuration of the model + parameters : Dict + The parameters of the model + response : Dict + The response of the model + """ + + api: str = field(default="") + configuration: Dict = field(default_factory=dict) + parameters: Dict = field(default_factory=dict) + response: Dict = field(default_factory=dict) + + +@dataclass +class TemplateSettings: + """TemplateSettings class to define the template of the prompty + + Attributes + ---------- + type : str + The type of the template + parser : str + The parser of the template + """ + + type: str = field(default="mustache") + parser: str = field(default="") + + +@dataclass +class Prompty: + """Prompty class to define the prompty + + Attributes + ---------- + name : str + The name of the prompty + description : str + The description of the prompty + authors : List[str] + The authors of the prompty + tags : List[str] + The tags of the prompty + version : str + The version of the prompty + base : str + The base of the prompty + basePrompty : Prompty + The base prompty + model : ModelSettings + The model of the prompty + sample : Dict + The sample of the prompty + inputs : Dict[str, PropertySettings] + The inputs of the prompty + outputs : Dict[str, PropertySettings] + The outputs of the prompty + template : TemplateSettings + The template of the prompty + file : FilePath + The file of the prompty + content : Union[str, List[str], Dict] + The content of the prompty + """ + + # metadata + name: str = field(default="") + description: str = field(default="") + authors: List[str] = field(default_factory=list) + tags: List[str] = field(default_factory=list) + version: str = field(default="") + base: str = field(default="") + basePrompty: Union[Prompty, None] = field(default=None) + # model + model: ModelSettings = field(default_factory=ModelSettings) + + # sample + sample: Dict = field(default_factory=dict) + + # input / output + inputs: Dict[str, PropertySettings] = field(default_factory=dict) + outputs: Dict[str, PropertySettings] = field(default_factory=dict) + + # template + template: TemplateSettings = field(default_factory=TemplateSettings) + + file: Union[Path, str] = field(default="") + content: Union[str, List[str], Dict] = field(default="") + + def to_safe_dict(self) -> Dict[str, Any]: + d = {} + if self.model: + d["model"] = asdict(self.model) + _mask_secrets(d, ["model", "configuration"]) + if self.template: + d["template"] = asdict(self.template) + if self.inputs: + d["inputs"] = {k: asdict(v) for k, v in self.inputs.items()} + if self.outputs: + d["outputs"] = {k: asdict(v) for k, v in self.outputs.items()} + if self.file: + d["file"] = str(self.file.as_posix()) if isinstance(self.file, Path) else self.file + return d + + @staticmethod + def hoist_base_prompty(top: Prompty, base: Prompty) -> Prompty: + top.name = base.name if top.name == "" else top.name + top.description = base.description if top.description == "" else top.description + top.authors = list(set(base.authors + top.authors)) + top.tags = list(set(base.tags + top.tags)) + top.version = base.version if top.version == "" else top.version + + top.model.api = base.model.api if top.model.api == "" else top.model.api + top.model.configuration = param_hoisting(top.model.configuration, base.model.configuration) + top.model.parameters = param_hoisting(top.model.parameters, base.model.parameters) + top.model.response = param_hoisting(top.model.response, base.model.response) + + top.sample = param_hoisting(top.sample, base.sample) + + top.basePrompty = base + + return top + + @staticmethod + def _process_file(file: str, parent: Path) -> Any: + file_path = Path(parent / Path(file)).resolve().absolute() + if file_path.exists(): + items = load_json(file_path) + if isinstance(items, list): + return [Prompty.normalize(value, parent) for value in items] + elif isinstance(items, Dict): + return {key: Prompty.normalize(value, parent) for key, value in items.items()} + else: + return items + else: + raise FileNotFoundError(f"File {file} not found") + + @staticmethod + def _process_env(variable: str, env_error=True, default: Union[str, None] = None) -> Any: + if variable in os.environ.keys(): + return os.environ[variable] + else: + if default: + return default + if env_error: + raise ValueError(f"Variable {variable} not found in environment") + + return "" + + @staticmethod + def normalize(attribute: Any, parent: Path, env_error=True) -> Any: + if isinstance(attribute, str): + attribute = attribute.strip() + if attribute.startswith("${") and attribute.endswith("}"): + # check if env or file + variable = attribute[2:-1].split(":") + if variable[0] == "env" and len(variable) > 1: + return Prompty._process_env( + variable[1], + env_error, + variable[2] if len(variable) > 2 else None, + ) + elif variable[0] == "file" and len(variable) > 1: + return Prompty._process_file(variable[1], parent) + else: + raise ValueError(f"Invalid attribute format ({attribute})") + else: + return attribute + elif isinstance(attribute, list): + return [Prompty.normalize(value, parent) for value in attribute] + elif isinstance(attribute, Dict): + return {key: Prompty.normalize(value, parent) for key, value in attribute.items()} + else: + return attribute + + +def param_hoisting(top: Dict[str, Any], bottom: Dict[str, Any], top_key: Union[str, None] = None) -> Dict[str, Any]: + if top_key: + new_dict = {**top[top_key]} if top_key in top else {} + else: + new_dict = {**top} + for key, value in bottom.items(): + if not key in new_dict: + new_dict[key] = value + return new_dict + + +class PromptyStream(Iterator): + """PromptyStream class to iterate over LLM stream. + Necessary for Prompty to handle streaming data when tracing.""" + + def __init__(self, name: str, iterator: Iterator): + self.name = name + self.iterator = iterator + self.items: List[Any] = [] + self.__name__ = "PromptyStream" + + def __iter__(self): + return self + + def __next__(self): + try: + # enumerate but add to list + o = self.iterator.__next__() + self.items.append(o) + return o + + except StopIteration: + # StopIteration is raised + # contents are exhausted + if len(self.items) > 0: + with Tracer.start("PromptyStream") as trace: + trace("signature", f"{self.name}.PromptyStream") + trace("inputs", "None") + trace("result", [to_dict(s) for s in self.items]) + + raise StopIteration + + +class AsyncPromptyStream(AsyncIterator): + """AsyncPromptyStream class to iterate over LLM stream. + Necessary for Prompty to handle streaming data when tracing.""" + + def __init__(self, name: str, iterator: AsyncIterator): + self.name = name + self.iterator = iterator + self.items: List[Any] = [] + self.__name__ = "AsyncPromptyStream" + + def __aiter__(self): + return self + + async def __anext__(self): + try: + # enumerate but add to list + o = await self.iterator.__anext__() + self.items.append(o) + return o + + except StopAsyncIteration: + # StopIteration is raised + # contents are exhausted + if len(self.items) > 0: + with Tracer.start("AsyncPromptyStream") as trace: + trace("signature", f"{self.name}.AsyncPromptyStream") + trace("inputs", "None") + trace("result", [to_dict(s) for s in self.items]) + + raise StopAsyncIteration + + +def _mask_secrets(d: Dict[str, Any], path: list[str], patterns: list[str] = ["key", "secret"]) -> bool: + sub_d = d + for key in path: + if key not in sub_d: + return False + sub_d = sub_d[key] + + for k, v in sub_d.items(): + if any([pattern in k.lower() for pattern in patterns]): + sub_d[k] = "*" * len(v) + return True diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_invoker.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_invoker.py new file mode 100644 index 000000000000..d682662e7b01 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_invoker.py @@ -0,0 +1,295 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# mypy: disable-error-code="return-value,operator" +# pylint: disable=line-too-long,R,docstring-missing-param,docstring-missing-return,docstring-missing-rtype,unnecessary-pass +import abc +from typing import Any, Callable, Dict, Literal +from ._tracer import trace +from ._core import Prompty + + +class Invoker(abc.ABC): + """Abstract class for Invoker + + Attributes + ---------- + prompty : Prompty + The prompty object + name : str + The name of the invoker + + """ + + def __init__(self, prompty: Prompty) -> None: + self.prompty = prompty + self.name = self.__class__.__name__ + + @abc.abstractmethod + def invoke(self, data: Any) -> Any: + """Abstract method to invoke the invoker + + Parameters + ---------- + data : Any + The data to be invoked + + Returns + ------- + Any + The invoked + """ + pass + + @abc.abstractmethod + async def invoke_async(self, data: Any) -> Any: + """Abstract method to invoke the invoker asynchronously + + Parameters + ---------- + data : Any + The data to be invoked + + Returns + ------- + Any + The invoked + """ + pass + + @trace + def run(self, data: Any) -> Any: + """Method to run the invoker + + Parameters + ---------- + data : Any + The data to be invoked + + Returns + ------- + Any + The invoked + """ + return self.invoke(data) + + @trace + async def run_async(self, data: Any) -> Any: + """Method to run the invoker asynchronously + + Parameters + ---------- + data : Any + The data to be invoked + + Returns + ------- + Any + The invoked + """ + return await self.invoke_async(data) + + +class InvokerFactory: + """Factory class for Invoker""" + + _renderers: Dict[str, Invoker] = {} + _parsers: Dict[str, Invoker] = {} + _executors: Dict[str, Invoker] = {} + _processors: Dict[str, Invoker] = {} + + @classmethod + def add_renderer(cls, name: str, invoker: Invoker) -> None: + cls._renderers[name] = invoker + + @classmethod + def add_parser(cls, name: str, invoker: Invoker) -> None: + cls._parsers[name] = invoker + + @classmethod + def add_executor(cls, name: str, invoker: Invoker) -> None: + cls._executors[name] = invoker + + @classmethod + def add_processor(cls, name: str, invoker: Invoker) -> None: + cls._processors[name] = invoker + + @classmethod + def register_renderer(cls, name: str) -> Callable: + def inner_wrapper(wrapped_class: Invoker) -> Callable: + cls._renderers[name] = wrapped_class + return wrapped_class # type: ignore + + return inner_wrapper + + @classmethod + def register_parser(cls, name: str) -> Callable: + def inner_wrapper(wrapped_class: Invoker) -> Callable: + cls._parsers[name] = wrapped_class + return wrapped_class # type: ignore + + return inner_wrapper + + @classmethod + def register_executor(cls, name: str) -> Callable: + def inner_wrapper(wrapped_class: Invoker) -> Callable: + cls._executors[name] = wrapped_class + return wrapped_class # type: ignore + + return inner_wrapper + + @classmethod + def register_processor(cls, name: str) -> Callable: + def inner_wrapper(wrapped_class: Invoker) -> Callable: + cls._processors[name] = wrapped_class + return wrapped_class # type: ignore + + return inner_wrapper + + @classmethod + def _get_name( + cls, + type: Literal["renderer", "parser", "executor", "processor"], + prompty: Prompty, + ) -> str: + if type == "renderer": + return prompty.template.type + elif type == "parser": + return f"{prompty.template.parser}.{prompty.model.api}" + elif type == "executor": + return prompty.model.configuration["type"] + elif type == "processor": + return prompty.model.configuration["type"] + else: + raise ValueError(f"Type {type} not found") + + @classmethod + def _get_invoker( + cls, + type: Literal["renderer", "parser", "executor", "processor"], + prompty: Prompty, + ) -> Invoker: + if type == "renderer": + name = prompty.template.type + if name not in cls._renderers: + raise ValueError(f"Renderer {name} not found") + + return cls._renderers[name](prompty) # type: ignore + + elif type == "parser": + name = f"{prompty.template.parser}.{prompty.model.api}" + if name not in cls._parsers: + raise ValueError(f"Parser {name} not found") + + return cls._parsers[name](prompty) # type: ignore + + elif type == "executor": + name = prompty.model.configuration["type"] + if name not in cls._executors: + raise ValueError(f"Executor {name} not found") + + return cls._executors[name](prompty) # type: ignore + + elif type == "processor": + name = prompty.model.configuration["type"] + if name not in cls._processors: + raise ValueError(f"Processor {name} not found") + + return cls._processors[name](prompty) # type: ignore + + else: + raise ValueError(f"Type {type} not found") + + @classmethod + def run( + cls, + type: Literal["renderer", "parser", "executor", "processor"], + prompty: Prompty, + data: Any, + default: Any = None, + ): + name = cls._get_name(type, prompty) + if name.startswith("NOOP") and default is not None: + return default + elif name.startswith("NOOP"): + return data + + invoker = cls._get_invoker(type, prompty) + value = invoker.run(data) + return value + + @classmethod + async def run_async( + cls, + type: Literal["renderer", "parser", "executor", "processor"], + prompty: Prompty, + data: Any, + default: Any = None, + ): + name = cls._get_name(type, prompty) + if name.startswith("NOOP") and default is not None: + return default + elif name.startswith("NOOP"): + return data + invoker = cls._get_invoker(type, prompty) + value = await invoker.run_async(data) + return value + + @classmethod + def run_renderer(cls, prompty: Prompty, data: Any, default: Any = None) -> Any: + return cls.run("renderer", prompty, data, default) + + @classmethod + async def run_renderer_async(cls, prompty: Prompty, data: Any, default: Any = None) -> Any: + return await cls.run_async("renderer", prompty, data, default) + + @classmethod + def run_parser(cls, prompty: Prompty, data: Any, default: Any = None) -> Any: + return cls.run("parser", prompty, data, default) + + @classmethod + async def run_parser_async(cls, prompty: Prompty, data: Any, default: Any = None) -> Any: + return await cls.run_async("parser", prompty, data, default) + + @classmethod + def run_executor(cls, prompty: Prompty, data: Any, default: Any = None) -> Any: + return cls.run("executor", prompty, data, default) + + @classmethod + async def run_executor_async(cls, prompty: Prompty, data: Any, default: Any = None) -> Any: + return await cls.run_async("executor", prompty, data, default) + + @classmethod + def run_processor(cls, prompty: Prompty, data: Any, default: Any = None) -> Any: + return cls.run("processor", prompty, data, default) + + @classmethod + async def run_processor_async(cls, prompty: Prompty, data: Any, default: Any = None) -> Any: + return await cls.run_async("processor", prompty, data, default) + + +class InvokerException(Exception): + """Exception class for Invoker""" + + def __init__(self, message: str, type: str) -> None: + super().__init__(message) + self.type = type + + def __str__(self) -> str: + return f"{super().__str__()}. Make sure to pip install any necessary package extras (i.e. could be something like `pip install prompty[{self.type}]`) for {self.type} as well as import the appropriate invokers (i.e. could be something like `import prompty.{self.type}`)." + + +@InvokerFactory.register_renderer("NOOP") +@InvokerFactory.register_parser("NOOP") +@InvokerFactory.register_executor("NOOP") +@InvokerFactory.register_processor("NOOP") +@InvokerFactory.register_parser("prompty.embedding") +@InvokerFactory.register_parser("prompty.image") +@InvokerFactory.register_parser("prompty.completion") +class NoOp(Invoker): + def invoke(self, data: Any) -> Any: + return data + + async def invoke_async(self, data: str) -> Any: + return self.invoke(data) diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_mustache.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_mustache.py new file mode 100644 index 000000000000..f7a0c21d8bb8 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_mustache.py @@ -0,0 +1,671 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# pylint: disable=line-too-long,R,consider-using-dict-items,docstring-missing-return,docstring-missing-rtype,docstring-missing-param,global-statement,unused-argument,global-variable-not-assigned,protected-access,logging-fstring-interpolation,deprecated-method +from __future__ import annotations +import logging +from collections.abc import Iterator, Sequence +from types import MappingProxyType +from typing import ( + Any, + Dict, + List, + Literal, + Mapping, + Optional, + Union, + cast, +) +from typing_extensions import TypeAlias + +logger = logging.getLogger(__name__) + + +Scopes: TypeAlias = List[Union[Literal[False, 0], Mapping[str, Any]]] + + +# Globals +_CURRENT_LINE = 1 +_LAST_TAG_LINE = None + + +class ChevronError(SyntaxError): + """Custom exception for Chevron errors.""" + + +# +# Helper functions +# + + +def grab_literal(template: str, l_del: str) -> tuple[str, str]: + """Parse a literal from the template. + + Args: + template: The template to parse. + l_del: The left delimiter. + + Returns: + Tuple[str, str]: The literal and the template. + """ + + global _CURRENT_LINE + + try: + # Look for the next tag and move the template to it + literal, template = template.split(l_del, 1) + _CURRENT_LINE += literal.count("\n") + return (literal, template) + + # There are no more tags in the template? + except ValueError: + # Then the rest of the template is a literal + return (template, "") + + +def l_sa_check(template: str, literal: str, is_standalone: bool) -> bool: + """Do a preliminary check to see if a tag could be a standalone. + + Args: + template: The template. (Not used.) + literal: The literal. + is_standalone: Whether the tag is standalone. + + Returns: + bool: Whether the tag could be a standalone. + """ + + # If there is a newline, or the previous tag was a standalone + if literal.find("\n") != -1 or is_standalone: + padding = literal.split("\n")[-1] + + # If all the characters since the last newline are spaces + # Then the next tag could be a standalone + # Otherwise it can't be + return padding.isspace() or padding == "" + else: + return False + + +def r_sa_check(template: str, tag_type: str, is_standalone: bool) -> bool: + """Do a final check to see if a tag could be a standalone. + + Args: + template: The template. + tag_type: The type of the tag. + is_standalone: Whether the tag is standalone. + + Returns: + bool: Whether the tag could be a standalone. + """ + + # Check right side if we might be a standalone + if is_standalone and tag_type not in ["variable", "no escape"]: + on_newline = template.split("\n", 1) + + # If the stuff to the right of us are spaces we're a standalone + return on_newline[0].isspace() or not on_newline[0] + + # If we're a tag can't be a standalone + else: + return False + + +def parse_tag(template: str, l_del: str, r_del: str) -> tuple[tuple[str, str], str]: + """Parse a tag from a template. + + Args: + template: The template. + l_del: The left delimiter. + r_del: The right delimiter. + + Returns: + Tuple[Tuple[str, str], str]: The tag and the template. + + Raises: + ChevronError: If the tag is unclosed. + ChevronError: If the set delimiter tag is unclosed. + """ + global _CURRENT_LINE + global _LAST_TAG_LINE + + tag_types = { + "!": "comment", + "#": "section", + "^": "inverted section", + "/": "end", + ">": "partial", + "=": "set delimiter?", + "{": "no escape?", + "&": "no escape", + } + + # Get the tag + try: + tag, template = template.split(r_del, 1) + except ValueError as e: + msg = "unclosed tag " f"at line {_CURRENT_LINE}" + raise ChevronError(msg) from e + + # Find the type meaning of the first character + tag_type = tag_types.get(tag[0], "variable") + + # If the type is not a variable + if tag_type != "variable": + # Then that first character is not needed + tag = tag[1:] + + # If we might be a set delimiter tag + if tag_type == "set delimiter?": + # Double check to make sure we are + if tag.endswith("="): + tag_type = "set delimiter" + # Remove the equal sign + tag = tag[:-1] + + # Otherwise we should complain + else: + msg = "unclosed set delimiter tag\n" f"at line {_CURRENT_LINE}" + raise ChevronError(msg) + + elif ( + # If we might be a no html escape tag + tag_type == "no escape?" + # And we have a third curly brace + # (And are using curly braces as delimiters) + and l_del == "{{" + and r_del == "}}" + and template.startswith("}") + ): + # Then we are a no html escape tag + template = template[1:] + tag_type = "no escape" + + # Strip the whitespace off the key and return + return ((tag_type, tag.strip()), template) + + +# +# The main tokenizing function +# + + +def tokenize(template: str, def_ldel: str = "{{", def_rdel: str = "}}") -> Iterator[tuple[str, str]]: + """Tokenize a mustache template. + + Tokenizes a mustache template in a generator fashion, + using file-like objects. It also accepts a string containing + the template. + + + Arguments: + + template -- a file-like object, or a string of a mustache template + + def_ldel -- The default left delimiter + ("{{" by default, as in spec compliant mustache) + + def_rdel -- The default right delimiter + ("}}" by default, as in spec compliant mustache) + + + Returns: + + A generator of mustache tags in the form of a tuple + + -- (tag_type, tag_key) + + Where tag_type is one of: + * literal + * section + * inverted section + * end + * partial + * no escape + + And tag_key is either the key or in the case of a literal tag, + the literal itself. + """ + + global _CURRENT_LINE, _LAST_TAG_LINE + _CURRENT_LINE = 1 + _LAST_TAG_LINE = None + + is_standalone = True + open_sections = [] + l_del = def_ldel + r_del = def_rdel + + while template: + literal, template = grab_literal(template, l_del) + + # If the template is completed + if not template: + # Then yield the literal and leave + yield ("literal", literal) + break + + # Do the first check to see if we could be a standalone + is_standalone = l_sa_check(template, literal, is_standalone) + + # Parse the tag + tag, template = parse_tag(template, l_del, r_del) + tag_type, tag_key = tag + + # Special tag logic + + # If we are a set delimiter tag + if tag_type == "set delimiter": + # Then get and set the delimiters + dels = tag_key.strip().split(" ") + l_del, r_del = dels[0], dels[-1] + + # If we are a section tag + elif tag_type in ["section", "inverted section"]: + # Then open a new section + open_sections.append(tag_key) + _LAST_TAG_LINE = _CURRENT_LINE + + # If we are an end tag + elif tag_type == "end": + # Then check to see if the last opened section + # is the same as us + try: + last_section = open_sections.pop() + except IndexError as e: + msg = f'Trying to close tag "{tag_key}"\n' "Looks like it was not opened.\n" f"line {_CURRENT_LINE + 1}" + raise ChevronError(msg) from e + if tag_key != last_section: + # Otherwise we need to complain + msg = ( + f'Trying to close tag "{tag_key}"\n' + f'last open tag is "{last_section}"\n' + f"line {_CURRENT_LINE + 1}" + ) + raise ChevronError(msg) + + # Do the second check to see if we're a standalone + is_standalone = r_sa_check(template, tag_type, is_standalone) + + # Which if we are + if is_standalone: + # Remove the stuff before the newline + template = template.split("\n", 1)[-1] + + # Partials need to keep the spaces on their left + if tag_type != "partial": + # But other tags don't + literal = literal.rstrip(" ") + + # Start yielding + # Ignore literals that are empty + if literal != "": + yield ("literal", literal) + + # Ignore comments and set delimiters + if tag_type not in ["comment", "set delimiter?"]: + yield (tag_type, tag_key) + + # If there are any open sections when we're done + if open_sections: + # Then we need to complain + msg = ( + "Unexpected EOF\n" + f'the tag "{open_sections[-1]}" was never closed\n' + f"was opened at line {_LAST_TAG_LINE}" + ) + raise ChevronError(msg) + + +# +# Helper functions +# + + +def _html_escape(string: str) -> str: + """HTML escape all of these " & < >""" + + html_codes = { + '"': """, + "<": "<", + ">": ">", + } + + # & must be handled first + string = string.replace("&", "&") + for char in html_codes: + string = string.replace(char, html_codes[char]) + return string + + +def _get_key( + key: str, + scopes: Scopes, + warn: bool, + keep: bool, + def_ldel: str, + def_rdel: str, +) -> Any: + """Get a key from the current scope""" + + # If the key is a dot + if key == ".": + # Then just return the current scope + return scopes[0] + + # Loop through the scopes + for scope in scopes: + try: + # Return an empty string if falsy, with two exceptions + # 0 should return 0, and False should return False + if scope in (0, False): + return scope + + # For every dot separated key + for child in key.split("."): + # Return an empty string if falsy, with two exceptions + # 0 should return 0, and False should return False + if scope in (0, False): + return scope + # Move into the scope + try: + # Try subscripting (Normal dictionaries) + scope = cast(Dict[str, Any], scope)[child] + except (TypeError, AttributeError): + try: + scope = getattr(scope, child) + except (TypeError, AttributeError): + # Try as a list + scope = scope[int(child)] # type: ignore + + try: + # This allows for custom falsy data types + # https://github.com/noahmorrison/chevron/issues/35 + if scope._CHEVRON_return_scope_when_falsy: # type: ignore + return scope + except AttributeError: + if scope in (0, False): + return scope + return scope or "" + except (AttributeError, KeyError, IndexError, ValueError): + # We couldn't find the key in the current scope + # We'll try again on the next pass + pass + + # We couldn't find the key in any of the scopes + + if warn: + logger.warn(f"Could not find key '{key}'") + + if keep: + return f"{def_ldel} {key} {def_rdel}" + + return "" + + +def _get_partial(name: str, partials_dict: Mapping[str, str]) -> str: + """Load a partial""" + try: + # Maybe the partial is in the dictionary + return partials_dict[name] + except KeyError: + return "" + + +# +# The main rendering function +# +g_token_cache: Dict[str, List[tuple[str, str]]] = {} + +EMPTY_DICT: MappingProxyType[str, str] = MappingProxyType({}) + + +def render( + template: Union[str, List[tuple[str, str]]] = "", + data: Mapping[str, Any] = EMPTY_DICT, + partials_dict: Mapping[str, str] = EMPTY_DICT, + padding: str = "", + def_ldel: str = "{{", + def_rdel: str = "}}", + scopes: Optional[Scopes] = None, + warn: bool = False, + keep: bool = False, +) -> str: + """Render a mustache template. + + Renders a mustache template with a data scope and inline partial capability. + + Arguments: + + template -- A file-like object or a string containing the template. + + data -- A python dictionary with your data scope. + + partials_path -- The path to where your partials are stored. + If set to None, then partials won't be loaded from the file system + (defaults to '.'). + + partials_ext -- The extension that you want the parser to look for + (defaults to 'mustache'). + + partials_dict -- A python dictionary which will be search for partials + before the filesystem is. {'include': 'foo'} is the same + as a file called include.mustache + (defaults to {}). + + padding -- This is for padding partials, and shouldn't be used + (but can be if you really want to). + + def_ldel -- The default left delimiter + ("{{" by default, as in spec compliant mustache). + + def_rdel -- The default right delimiter + ("}}" by default, as in spec compliant mustache). + + scopes -- The list of scopes that get_key will look through. + + warn -- Log a warning when a template substitution isn't found in the data + + keep -- Keep unreplaced tags when a substitution isn't found in the data. + + + Returns: + + A string containing the rendered template. + """ + + # If the template is a sequence but not derived from a string + if isinstance(template, Sequence) and not isinstance(template, str): + # Then we don't need to tokenize it + # But it does need to be a generator + tokens: Iterator[tuple[str, str]] = (token for token in template) + else: + if template in g_token_cache: + tokens = (token for token in g_token_cache[template]) + else: + # Otherwise make a generator + tokens = tokenize(template, def_ldel, def_rdel) + + output = "" + + if scopes is None: + scopes = [data] + + # Run through the tokens + for tag, key in tokens: + # Set the current scope + current_scope = scopes[0] + + # If we're an end tag + if tag == "end": + # Pop out of the latest scope + del scopes[0] + + # If the current scope is falsy and not the only scope + elif not current_scope and len(scopes) != 1: + if tag in ["section", "inverted section"]: + # Set the most recent scope to a falsy value + scopes.insert(0, False) + + # If we're a literal tag + elif tag == "literal": + # Add padding to the key and add it to the output + output += key.replace("\n", "\n" + padding) + + # If we're a variable tag + elif tag == "variable": + # Add the html escaped key to the output + thing = _get_key(key, scopes, warn=warn, keep=keep, def_ldel=def_ldel, def_rdel=def_rdel) + if thing is True and key == ".": + # if we've coerced into a boolean by accident + # (inverted tags do this) + # then get the un-coerced object (next in the stack) + thing = scopes[1] + if not isinstance(thing, str): + thing = str(thing) + output += _html_escape(thing) + + # If we're a no html escape tag + elif tag == "no escape": + # Just lookup the key and add it + thing = _get_key(key, scopes, warn=warn, keep=keep, def_ldel=def_ldel, def_rdel=def_rdel) + if not isinstance(thing, str): + thing = str(thing) + output += thing + + # If we're a section tag + elif tag == "section": + # Get the sections scope + scope = _get_key(key, scopes, warn=warn, keep=keep, def_ldel=def_ldel, def_rdel=def_rdel) + + # If the scope is a callable (as described in + # https://mustache.github.io/mustache.5.html) + if callable(scope): + # Generate template text from tags + text = "" + tags: List[tuple[str, str]] = [] + for token in tokens: + if token == ("end", key): + break + + tags.append(token) + tag_type, tag_key = token + if tag_type == "literal": + text += tag_key + elif tag_type == "no escape": + text += f"{def_ldel}& {tag_key} {def_rdel}" + else: + text += "{}{} {}{}".format( + def_ldel, + { + "comment": "!", + "section": "#", + "inverted section": "^", + "end": "/", + "partial": ">", + "set delimiter": "=", + "no escape": "&", + "variable": "", + }[tag_type], + tag_key, + def_rdel, + ) + + g_token_cache[text] = tags + + rend = scope( + text, + lambda template, data=None: render( + template, + data={}, + partials_dict=partials_dict, + padding=padding, + def_ldel=def_ldel, + def_rdel=def_rdel, + scopes=data and [data] + scopes or scopes, + warn=warn, + keep=keep, + ), + ) + + output += rend # type: ignore[reportOperatorIssue] + + # If the scope is a sequence, an iterator or generator but not + # derived from a string + elif isinstance(scope, (Sequence, Iterator)) and not isinstance(scope, str): + # Then we need to do some looping + + # Gather up all the tags inside the section + # (And don't be tricked by nested end tags with the same key) + # TODO: This feels like it still has edge cases, no? + tags = [] + tags_with_same_key = 0 + for token in tokens: + if token == ("section", key): + tags_with_same_key += 1 + if token == ("end", key): + tags_with_same_key -= 1 + if tags_with_same_key < 0: + break + tags.append(token) + + # For every item in the scope + for thing in scope: + # Append it as the most recent scope and render + new_scope = [thing] + scopes + rend = render( + template=tags, + scopes=new_scope, + padding=padding, + partials_dict=partials_dict, + def_ldel=def_ldel, + def_rdel=def_rdel, + warn=warn, + keep=keep, + ) + + output += rend + + else: + # Otherwise we're just a scope section + scopes.insert(0, scope) # type: ignore[reportArgumentType] + + # If we're an inverted section + elif tag == "inverted section": + # Add the flipped scope to the scopes + scope = _get_key(key, scopes, warn=warn, keep=keep, def_ldel=def_ldel, def_rdel=def_rdel) + scopes.insert(0, cast(Literal[False], not scope)) + + # If we're a partial + elif tag == "partial": + # Load the partial + partial = _get_partial(key, partials_dict) + + # Find what to pad the partial with + left = output.rpartition("\n")[2] + part_padding = padding + if left.isspace(): + part_padding += left + + # Render the partial + part_out = render( + template=partial, + partials_dict=partials_dict, + def_ldel=def_ldel, + def_rdel=def_rdel, + padding=part_padding, + scopes=scopes, + warn=warn, + keep=keep, + ) + + # If the partial was indented + if left.isspace(): + # then remove the spaces from the end + part_out = part_out.rstrip(" \t") + + # Add the partials output to the output + output += part_out + + return output diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_parsers.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_parsers.py new file mode 100644 index 000000000000..0e92e84667cd --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_parsers.py @@ -0,0 +1,148 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# mypy: disable-error-code="union-attr,return-value" +# pylint: disable=line-too-long,R,consider-using-enumerate,docstring-missing-param,docstring-missing-return,docstring-missing-rtype +import re +import base64 +from pathlib import Path +from typing import Any +from ._core import Prompty +from ._invoker import Invoker, InvokerFactory + + +@InvokerFactory.register_parser("prompty.chat") +class PromptyChatParser(Invoker): + """Prompty Chat Parser""" + + def __init__(self, prompty: Prompty) -> None: + super().__init__(prompty) + self.roles = ["assistant", "function", "system", "user"] + self.path = Path(self.prompty.file).parent + + def inline_image(self, image_item: str) -> str: + """Inline Image + + Parameters + ---------- + image_item : str + The image item to inline + + Returns + ------- + str + The inlined image + """ + # pass through if it's a url or base64 encoded + if image_item.startswith("http") or image_item.startswith("data"): + return image_item + # otherwise, it's a local file - need to base64 encode it + else: + image_path = self.path / image_item + with open(image_path, "rb") as f: + base64_image = base64.b64encode(f.read()).decode("utf-8") + + if image_path.suffix == ".png": + return f"data:image/png;base64,{base64_image}" + elif image_path.suffix == ".jpg": + return f"data:image/jpeg;base64,{base64_image}" + elif image_path.suffix == ".jpeg": + return f"data:image/jpeg;base64,{base64_image}" + else: + raise ValueError( + f"Invalid image format {image_path.suffix} - currently only .png and .jpg / .jpeg are supported." + ) + + def parse_content(self, content: str): + """for parsing inline images + + Parameters + ---------- + content : str + The content to parse + + Returns + ------- + any + The parsed content + """ + # regular expression to parse markdown images + image = r"(?P!\[[^\]]*\])\((?P.*?)(?=\"|\))\)" + matches = re.findall(image, content, flags=re.MULTILINE) + if len(matches) > 0: + content_items = [] + content_chunks = re.split(image, content, flags=re.MULTILINE) + current_chunk = 0 + for i in range(len(content_chunks)): + # image entry + if current_chunk < len(matches) and content_chunks[i] == matches[current_chunk][0]: + content_items.append( + { + "type": "image_url", + "image_url": {"url": self.inline_image(matches[current_chunk][1].split(" ")[0].strip())}, + } + ) + # second part of image entry + elif current_chunk < len(matches) and content_chunks[i] == matches[current_chunk][1]: + current_chunk += 1 + # text entry + else: + if len(content_chunks[i].strip()) > 0: + content_items.append({"type": "text", "text": content_chunks[i].strip()}) + return content_items + else: + return content + + def invoke(self, data: str) -> Any: + """Invoke the Prompty Chat Parser + + Parameters + ---------- + data : str + The data to parse + + Returns + ------- + str + The parsed data + """ + messages = [] + separator = r"(?i)^\s*#?\s*(" + "|".join(self.roles) + r")\s*:\s*\n" + + # get valid chunks - remove empty items + chunks = [item for item in re.split(separator, data, flags=re.MULTILINE) if len(item.strip()) > 0] + + # if no starter role, then inject system role + if not chunks[0].strip().lower() in self.roles: + chunks.insert(0, "system") + + # if last chunk is role entry, then remove (no content?) + if chunks[-1].strip().lower() in self.roles: + chunks.pop() + + if len(chunks) % 2 != 0: + raise ValueError("Invalid prompt format") + + # create messages + for i in range(0, len(chunks), 2): + role = chunks[i].strip().lower() + content = chunks[i + 1].strip() + messages.append({"role": role, "content": self.parse_content(content)}) + + return messages + + async def invoke_async(self, data: str) -> Any: + """Invoke the Prompty Chat Parser (Async) + + Parameters + ---------- + data : str + The data to parse + + Returns + ------- + str + The parsed data + """ + return self.invoke(data) diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_patch.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_patch.py new file mode 100644 index 000000000000..8689d1bd56b4 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_patch.py @@ -0,0 +1,120 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# pylint: disable=line-too-long,R +"""Customize generated code here. + +Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize +""" + +import traceback +from pathlib import Path +from typing import Any, Dict, List, Optional +from typing_extensions import Self +from ._core import Prompty +from ._mustache import render +from ._prompty_utils import load, prepare + + +class PromptTemplate: + """The helper class which takes variant of inputs, e.g. Prompty format or string, and returns the parsed prompt in an array.""" + + @classmethod + def from_prompty(cls, file_path: str) -> Self: + """Initialize a PromptTemplate object from a prompty file. + + :param file_path: The path to the prompty file. + :type file_path: str + :return: The PromptTemplate object. + :rtype: PromptTemplate + """ + if not file_path: + raise ValueError("Please provide file_path") + + # Get the absolute path of the file by `traceback.extract_stack()`, it's "-2" because: + # In the stack, the last function is the current function. + # The second last function is the caller function, which is the root of the file_path. + stack = traceback.extract_stack() + caller = Path(stack[-2].filename) + abs_file_path = Path(caller.parent / Path(file_path)).resolve().absolute() + + prompty = load(str(abs_file_path)) + return cls(prompty=prompty) + + @classmethod + def from_string(cls, prompt_template: str, api: str = "chat", model_name: Optional[str] = None) -> Self: + """Initialize a PromptTemplate object from a message template. + + :param prompt_template: The prompt template string. + :type prompt_template: str + :param api: The API type, e.g. "chat" or "completion". + :type api: str + :param model_name: The model name, e.g. "gpt-4o-mini". + :type model_name: str + :return: The PromptTemplate object. + :rtype: PromptTemplate + """ + return cls( + api=api, + prompt_template=prompt_template, + model_name=model_name, + prompty=None, + ) + + def __init__( + self, + *, + api: str = "chat", + prompty: Optional[Prompty] = None, + prompt_template: Optional[str] = None, + model_name: Optional[str] = None, + ) -> None: + self.prompty = prompty + if self.prompty is not None: + self.model_name = ( + self.prompty.model.configuration["azure_deployment"] + if "azure_deployment" in self.prompty.model.configuration + else None + ) + self.parameters = self.prompty.model.parameters + self._config = {} + elif prompt_template is not None: + self.model_name = model_name + self.parameters = {} + # _config is a dict to hold the internal configuration + self._config = { + "api": api if api is not None else "chat", + "prompt_template": prompt_template, + } + else: + raise ValueError("Please pass valid arguments for PromptTemplate") + + def create_messages(self, data: Optional[Dict[str, Any]] = None, **kwargs) -> List[Dict[str, Any]]: + """Render the prompt template with the given data. + + :param data: The data to render the prompt template with. + :type data: Optional[Dict[str, Any]] + :return: The rendered prompt template. + :rtype: List[Dict[str, Any]] + """ + if data is None: + data = kwargs + + if self.prompty is not None: + parsed = prepare(self.prompty, data) + return parsed + elif "prompt_template" in self._config: + system_prompt = render(self._config["prompt_template"], data) + return [{"role": "system", "content": system_prompt}] + else: + raise ValueError("Please provide valid prompt template") + + +def patch_sdk(): + """Do not remove from this file. + + `patch_sdk` is a last resort escape hatch that allows you to do customizations + you can't accomplish using the techniques described in + https://aka.ms/azsdk/python/dpcodegen/python/customize + """ diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_prompty_utils.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_prompty_utils.py new file mode 100644 index 000000000000..5ea38bda6229 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_prompty_utils.py @@ -0,0 +1,415 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# mypy: disable-error-code="assignment" +# pylint: disable=R,docstring-missing-param,docstring-missing-return,docstring-missing-rtype,dangerous-default-value,redefined-outer-name,unused-wildcard-import,wildcard-import,raise-missing-from +import traceback +from pathlib import Path +from typing import Any, Dict, List, Union +from ._tracer import trace +from ._invoker import InvokerFactory +from ._core import ( + ModelSettings, + Prompty, + PropertySettings, + TemplateSettings, + param_hoisting, +) +from ._utils import ( + load_global_config, + load_prompty, +) + +from ._renderers import * +from ._parsers import * + + +@trace(description="Create a headless prompty object for programmatic use.") +def headless( + api: str, + content: Union[str, List[str], dict], + configuration: Dict[str, Any] = {}, + parameters: Dict[str, Any] = {}, + connection: str = "default", +) -> Prompty: + """Create a headless prompty object for programmatic use. + + Parameters + ---------- + api : str + The API to use for the model + content : Union[str, List[str], dict] + The content to process + configuration : Dict[str, Any], optional + The configuration to use, by default {} + parameters : Dict[str, Any], optional + The parameters to use, by default {} + connection : str, optional + The connection to use, by default "default" + + Returns + ------- + Prompty + The headless prompty object + + Example + ------- + >>> import prompty + >>> p = prompty.headless( + api="embedding", + configuration={"type": "azure", "azure_deployment": "text-embedding-ada-002"}, + content="hello world", + ) + >>> emb = prompty.execute(p) + + """ + + # get caller's path (to get relative path for prompty.json) + caller = Path(traceback.extract_stack()[-2].filename) + templateSettings = TemplateSettings(type="NOOP", parser="NOOP") + modelSettings = ModelSettings( + api=api, + configuration=Prompty.normalize( + param_hoisting(configuration, load_global_config(caller.parent, connection)), + caller.parent, + ), + parameters=parameters, + ) + + return Prompty(model=modelSettings, template=templateSettings, content=content) + + +def _load_raw_prompty(attributes: dict, content: str, p: Path, global_config: dict): + if "model" not in attributes: + attributes["model"] = {} + + if "configuration" not in attributes["model"]: + attributes["model"]["configuration"] = global_config + else: + attributes["model"]["configuration"] = param_hoisting( + attributes["model"]["configuration"], + global_config, + ) + + # pull model settings out of attributes + try: + model = ModelSettings(**attributes.pop("model")) + except Exception as e: + raise ValueError(f"Error in model settings: {e}") + + # pull template settings + try: + if "template" in attributes: + t = attributes.pop("template") + if isinstance(t, dict): + template = TemplateSettings(**t) + # has to be a string denoting the type + else: + template = TemplateSettings(type=t, parser="prompty") + else: + template = TemplateSettings(type="mustache", parser="prompty") + except Exception as e: + raise ValueError(f"Error in template loader: {e}") + + # formalize inputs and outputs + if "inputs" in attributes: + try: + inputs = {k: PropertySettings(**v) for (k, v) in attributes.pop("inputs").items()} + except Exception as e: + raise ValueError(f"Error in inputs: {e}") + else: + inputs = {} + if "outputs" in attributes: + try: + outputs = {k: PropertySettings(**v) for (k, v) in attributes.pop("outputs").items()} + except Exception as e: + raise ValueError(f"Error in outputs: {e}") + else: + outputs = {} + + prompty = Prompty( + **attributes, + model=model, + inputs=inputs, + outputs=outputs, + template=template, + content=content, + file=p, + ) + + return prompty + + +@trace(description="Load a prompty file.") +def load(prompty_file: Union[str, Path], configuration: str = "default") -> Prompty: + """Load a prompty file. + + Parameters + ---------- + prompty_file : Union[str, Path] + The path to the prompty file + configuration : str, optional + The configuration to use, by default "default" + + Returns + ------- + Prompty + The loaded prompty object + + Example + ------- + >>> import prompty + >>> p = prompty.load("prompts/basic.prompty") + >>> print(p) + """ + + p = Path(prompty_file) + if not p.is_absolute(): + # get caller's path (take into account trace frame) + caller = Path(traceback.extract_stack()[-3].filename) + p = Path(caller.parent / p).resolve().absolute() + + # load dictionary from prompty file + matter = load_prompty(p) + + attributes = matter["attributes"] + content = matter["body"] + + # normalize attribute dictionary resolve keys and files + attributes = Prompty.normalize(attributes, p.parent) + + # load global configuration + global_config = Prompty.normalize(load_global_config(p.parent, configuration), p.parent) + + prompty = _load_raw_prompty(attributes, content, p, global_config) + + # recursive loading of base prompty + if "base" in attributes: + # load the base prompty from the same directory as the current prompty + base = load(p.parent / attributes["base"]) + prompty = Prompty.hoist_base_prompty(prompty, base) + + return prompty + + +@trace(description="Prepare the inputs for the prompt.") +def prepare( + prompt: Prompty, + inputs: Dict[str, Any] = {}, +): + """Prepare the inputs for the prompt. + + Parameters + ---------- + prompt : Prompty + The prompty object + inputs : Dict[str, Any], optional + The inputs to the prompt, by default {} + + Returns + ------- + dict + The prepared and hidrated template shaped to the LLM model + + Example + ------- + >>> import prompty + >>> p = prompty.load("prompts/basic.prompty") + >>> inputs = {"name": "John Doe"} + >>> content = prompty.prepare(p, inputs) + """ + inputs = param_hoisting(inputs, prompt.sample) + + render = InvokerFactory.run_renderer(prompt, inputs, prompt.content) + result = InvokerFactory.run_parser(prompt, render) + + return result + + +@trace(description="Prepare the inputs for the prompt.") +async def prepare_async( + prompt: Prompty, + inputs: Dict[str, Any] = {}, +): + """Prepare the inputs for the prompt. + + Parameters + ---------- + prompt : Prompty + The prompty object + inputs : Dict[str, Any], optional + The inputs to the prompt, by default {} + + Returns + ------- + dict + The prepared and hidrated template shaped to the LLM model + + Example + ------- + >>> import prompty + >>> p = prompty.load("prompts/basic.prompty") + >>> inputs = {"name": "John Doe"} + >>> content = await prompty.prepare_async(p, inputs) + """ + inputs = param_hoisting(inputs, prompt.sample) + + render = await InvokerFactory.run_renderer_async(prompt, inputs, prompt.content) + result = await InvokerFactory.run_parser_async(prompt, render) + + return result + + +@trace(description="Run the prepared Prompty content against the model.") +def run( + prompt: Prompty, + content: Union[dict, list, str], + configuration: Dict[str, Any] = {}, + parameters: Dict[str, Any] = {}, + raw: bool = False, +): + """Run the prepared Prompty content. + + Parameters + ---------- + prompt : Prompty + The prompty object + content : Union[dict, list, str] + The content to process + configuration : Dict[str, Any], optional + The configuration to use, by default {} + parameters : Dict[str, Any], optional + The parameters to use, by default {} + raw : bool, optional + Whether to skip processing, by default False + + Returns + ------- + Any + The result of the prompt + + Example + ------- + >>> import prompty + >>> p = prompty.load("prompts/basic.prompty") + >>> inputs = {"name": "John Doe"} + >>> content = prompty.prepare(p, inputs) + >>> result = prompty.run(p, content) + """ + + if configuration != {}: + prompt.model.configuration = param_hoisting(configuration, prompt.model.configuration) + + if parameters != {}: + prompt.model.parameters = param_hoisting(parameters, prompt.model.parameters) + + result = InvokerFactory.run_executor(prompt, content) + if not raw: + result = InvokerFactory.run_processor(prompt, result) + + return result + + +@trace(description="Run the prepared Prompty content against the model.") +async def run_async( + prompt: Prompty, + content: Union[dict, list, str], + configuration: Dict[str, Any] = {}, + parameters: Dict[str, Any] = {}, + raw: bool = False, +): + """Run the prepared Prompty content. + + Parameters + ---------- + prompt : Prompty + The prompty object + content : Union[dict, list, str] + The content to process + configuration : Dict[str, Any], optional + The configuration to use, by default {} + parameters : Dict[str, Any], optional + The parameters to use, by default {} + raw : bool, optional + Whether to skip processing, by default False + + Returns + ------- + Any + The result of the prompt + + Example + ------- + >>> import prompty + >>> p = prompty.load("prompts/basic.prompty") + >>> inputs = {"name": "John Doe"} + >>> content = await prompty.prepare_async(p, inputs) + >>> result = await prompty.run_async(p, content) + """ + + if configuration != {}: + prompt.model.configuration = param_hoisting(configuration, prompt.model.configuration) + + if parameters != {}: + prompt.model.parameters = param_hoisting(parameters, prompt.model.parameters) + + result = await InvokerFactory.run_executor_async(prompt, content) + if not raw: + result = await InvokerFactory.run_processor_async(prompt, result) + + return result + + +@trace(description="Execute a prompty") +def execute( + prompt: Union[str, Prompty], + configuration: Dict[str, Any] = {}, + parameters: Dict[str, Any] = {}, + inputs: Dict[str, Any] = {}, + raw: bool = False, + config_name: str = "default", +): + """Execute a prompty. + + Parameters + ---------- + prompt : Union[str, Prompty] + The prompty object or path to the prompty file + configuration : Dict[str, Any], optional + The configuration to use, by default {} + parameters : Dict[str, Any], optional + The parameters to use, by default {} + inputs : Dict[str, Any], optional + The inputs to the prompt, by default {} + raw : bool, optional + Whether to skip processing, by default False + connection : str, optional + The connection to use, by default "default" + + Returns + ------- + Any + The result of the prompt + + Example + ------- + >>> import prompty + >>> inputs = {"name": "John Doe"} + >>> result = prompty.execute("prompts/basic.prompty", inputs=inputs) + """ + if isinstance(prompt, str): + path = Path(prompt) + if not path.is_absolute(): + # get caller's path (take into account trace frame) + caller = Path(traceback.extract_stack()[-3].filename) + path = Path(caller.parent / path).resolve().absolute() + prompt = load(path, config_name) + + # prepare content + content = prepare(prompt, inputs) + + # run LLM model + result = run(prompt, content, configuration, parameters, raw) + + return result diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_renderers.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_renderers.py new file mode 100644 index 000000000000..0d682a7fe151 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_renderers.py @@ -0,0 +1,30 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# mypy: disable-error-code="union-attr,assignment,arg-type" +from pathlib import Path +from ._core import Prompty +from ._invoker import Invoker, InvokerFactory +from ._mustache import render + + +@InvokerFactory.register_renderer("mustache") +class MustacheRenderer(Invoker): + """Render a mustache template.""" + + def __init__(self, prompty: Prompty) -> None: + super().__init__(prompty) + self.templates = {} + cur_prompt = self.prompty + while cur_prompt: + self.templates[Path(cur_prompt.file).name] = cur_prompt.content + cur_prompt = cur_prompt.basePrompty + self.name = Path(self.prompty.file).name + + def invoke(self, data: str) -> str: + generated = render(self.prompty.content, data) # type: ignore + return generated + + async def invoke_async(self, data: str) -> str: + return self.invoke(data) diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_tracer.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_tracer.py new file mode 100644 index 000000000000..24f800b465f4 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_tracer.py @@ -0,0 +1,316 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# mypy: disable-error-code="union-attr,arg-type,misc,return-value,assignment,func-returns-value" +# pylint: disable=R,redefined-outer-name,bare-except,unspecified-encoding +import os +import json +import inspect +import traceback +import importlib +import contextlib +from pathlib import Path +from numbers import Number +from datetime import datetime +from functools import wraps, partial +from typing import Any, Callable, Dict, Iterator, List, Union + + +# clean up key value pairs for sensitive values +def sanitize(key: str, value: Any) -> Any: + if isinstance(value, str) and any([s in key.lower() for s in ["key", "token", "secret", "password", "credential"]]): + return len(str(value)) * "*" + + if isinstance(value, dict): + return {k: sanitize(k, v) for k, v in value.items()} + + return value + + +class Tracer: + _tracers: Dict[str, Callable[[str], Iterator[Callable[[str, Any], None]]]] = {} + + @classmethod + def add(cls, name: str, tracer: Callable[[str], Iterator[Callable[[str, Any], None]]]) -> None: + cls._tracers[name] = tracer + + @classmethod + def clear(cls) -> None: + cls._tracers = {} + + @classmethod + @contextlib.contextmanager + def start(cls, name: str) -> Iterator[Callable[[str, Any], None]]: + with contextlib.ExitStack() as stack: + traces: List[Any] = [stack.enter_context(tracer(name)) for tracer in cls._tracers.values()] # type: ignore + yield lambda key, value: [ # type: ignore + # normalize and sanitize any trace values + trace(key, sanitize(key, to_dict(value))) + for trace in traces + ] + + +def to_dict(obj: Any) -> Union[Dict[str, Any], List[Dict[str, Any]], str, Number, bool]: + # simple json types + if isinstance(obj, str) or isinstance(obj, Number) or isinstance(obj, bool): + return obj + + # datetime + if isinstance(obj, datetime): + return obj.isoformat() + + # safe Prompty obj serialization + if type(obj).__name__ == "Prompty": + return obj.to_safe_dict() + + # safe PromptyStream obj serialization + if type(obj).__name__ == "PromptyStream": + return "PromptyStream" + + if type(obj).__name__ == "AsyncPromptyStream": + return "AsyncPromptyStream" + + # recursive list and dict + if isinstance(obj, List): + return [to_dict(item) for item in obj] # type: ignore + + if isinstance(obj, Dict): + return {k: v if isinstance(v, str) else to_dict(v) for k, v in obj.items()} + + if isinstance(obj, Path): + return str(obj) + + # cast to string otherwise... + return str(obj) + + +def _name(func: Callable, args): + if hasattr(func, "__qualname__"): + signature = f"{func.__module__}.{func.__qualname__}" + else: + signature = f"{func.__module__}.{func.__name__}" + + # core invoker gets special treatment prompty.invoker.Invoker + core_invoker = signature.startswith("prompty.invoker.Invoker.run") + if core_invoker: + name = type(args[0]).__name__ + if signature.endswith("async"): + signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke_async" + else: + signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke" + else: + name = func.__name__ + + return name, signature + + +def _inputs(func: Callable, args, kwargs) -> dict: + ba = inspect.signature(func).bind(*args, **kwargs) + ba.apply_defaults() + + inputs = {k: to_dict(v) for k, v in ba.arguments.items() if k != "self"} + + return inputs + + +def _results(result: Any) -> Union[Dict, List[Dict], str, Number, bool]: + return to_dict(result) if result is not None else "None" + + +def _trace_sync(func: Union[Callable, None] = None, **okwargs: Any) -> Callable: + + @wraps(func) # type: ignore + def wrapper(*args, **kwargs): + name, signature = _name(func, args) # type: ignore + with Tracer.start(name) as trace: + trace("signature", signature) + + # support arbitrary keyword + # arguments for trace decorator + for k, v in okwargs.items(): + trace(k, to_dict(v)) + + inputs = _inputs(func, args, kwargs) # type: ignore + trace("inputs", inputs) + + try: + result = func(*args, **kwargs) # type: ignore + trace("result", _results(result)) + except Exception as e: + trace( + "result", + { + "exception": { + "type": type(e), + "traceback": (traceback.format_tb(tb=e.__traceback__) if e.__traceback__ else None), + "message": str(e), + "args": to_dict(e.args), + } + }, + ) + raise e + + return result + + return wrapper + + +def _trace_async(func: Union[Callable, None] = None, **okwargs: Any) -> Callable: + + @wraps(func) # type: ignore + async def wrapper(*args, **kwargs): + name, signature = _name(func, args) # type: ignore + with Tracer.start(name) as trace: + trace("signature", signature) + + # support arbitrary keyword + # arguments for trace decorator + for k, v in okwargs.items(): + trace(k, to_dict(v)) + + inputs = _inputs(func, args, kwargs) # type: ignore + trace("inputs", inputs) + try: + result = await func(*args, **kwargs) # type: ignore + trace("result", _results(result)) + except Exception as e: + trace( + "result", + { + "exception": { + "type": type(e), + "traceback": (traceback.format_tb(tb=e.__traceback__) if e.__traceback__ else None), + "message": str(e), + "args": to_dict(e.args), + } + }, + ) + raise e + + return result + + return wrapper + + +def trace(func: Union[Callable, None] = None, **kwargs: Any) -> Callable: + if func is None: + return partial(trace, **kwargs) + wrapped_method = _trace_async if inspect.iscoroutinefunction(func) else _trace_sync + return wrapped_method(func, **kwargs) + + +class PromptyTracer: + def __init__(self, output_dir: Union[str, None] = None) -> None: + if output_dir: + self.output = Path(output_dir).resolve().absolute() + else: + self.output = Path(Path(os.getcwd()) / ".runs").resolve().absolute() + + if not self.output.exists(): + self.output.mkdir(parents=True, exist_ok=True) + + self.stack: List[Dict[str, Any]] = [] + + @contextlib.contextmanager + def tracer(self, name: str) -> Iterator[Callable[[str, Any], None]]: + try: + self.stack.append({"name": name}) + frame = self.stack[-1] + frame["__time"] = { + "start": datetime.now(), + } + + def add(key: str, value: Any) -> None: + if key not in frame: + frame[key] = value + # multiple values creates list + else: + if isinstance(frame[key], list): + frame[key].append(value) + else: + frame[key] = [frame[key], value] + + yield add + finally: + frame = self.stack.pop() + start: datetime = frame["__time"]["start"] + end: datetime = datetime.now() + + # add duration to frame + frame["__time"] = { + "start": start.strftime("%Y-%m-%dT%H:%M:%S.%f"), + "end": end.strftime("%Y-%m-%dT%H:%M:%S.%f"), + "duration": int((end - start).total_seconds() * 1000), + } + + # hoist usage to parent frame + if "result" in frame and isinstance(frame["result"], dict): + if "usage" in frame["result"]: + frame["__usage"] = self.hoist_item( + frame["result"]["usage"], + frame["__usage"] if "__usage" in frame else {}, + ) + + # streamed results may have usage as well + if "result" in frame and isinstance(frame["result"], list): + for result in frame["result"]: + if isinstance(result, dict) and "usage" in result and isinstance(result["usage"], dict): + frame["__usage"] = self.hoist_item( + result["usage"], + frame["__usage"] if "__usage" in frame else {}, + ) + + # add any usage frames from below + if "__frames" in frame: + for child in frame["__frames"]: + if "__usage" in child: + frame["__usage"] = self.hoist_item( + child["__usage"], + frame["__usage"] if "__usage" in frame else {}, + ) + + # if stack is empty, dump the frame + if len(self.stack) == 0: + self.write_trace(frame) + # otherwise, append the frame to the parent + else: + if "__frames" not in self.stack[-1]: + self.stack[-1]["__frames"] = [] + self.stack[-1]["__frames"].append(frame) + + def hoist_item(self, src: Dict[str, Any], cur: Dict[str, Any]) -> Dict[str, Any]: + for key, value in src.items(): + if value is None or isinstance(value, list) or isinstance(value, dict): + continue + try: + if key not in cur: + cur[key] = value + else: + cur[key] += value + except: + continue + + return cur + + def write_trace(self, frame: Dict[str, Any]) -> None: + trace_file = self.output / f"{frame['name']}.{datetime.now().strftime('%Y%m%d.%H%M%S')}.tracy" + + v = importlib.metadata.version("prompty") # type: ignore + enriched_frame = { + "runtime": "python", + "version": v, + "trace": frame, + } + + with open(trace_file, "w") as f: + json.dump(enriched_frame, f, indent=4) + + +@contextlib.contextmanager +def console_tracer(name: str) -> Iterator[Callable[[str, Any], None]]: + try: + print(f"Starting {name}") + yield lambda key, value: print(f"{key}:\n{json.dumps(to_dict(value), indent=4)}") + finally: + print(f"Ending {name}") diff --git a/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_utils.py b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_utils.py new file mode 100644 index 000000000000..45f0ac97a7e3 --- /dev/null +++ b/sdk/ai/azure-ai-inference/azure/ai/inference/prompts/_utils.py @@ -0,0 +1,74 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# mypy: disable-error-code="import-untyped,return-value" +# pylint: disable=line-too-long,R,wrong-import-order,global-variable-not-assigned) +import re +import yaml +import json +from typing import Any, Dict, Union +from pathlib import Path + +_yaml_regex = re.compile( + r"^\s*" + r"(?:---|\+\+\+)" + r"(.*?)" + r"(?:---|\+\+\+)" + r"\s*(.+)$", + re.S | re.M, +) + + +def load_text(file_path, encoding="utf-8"): + with open(file_path, "r", encoding=encoding) as file: + return file.read() + + +def load_json(file_path, encoding="utf-8"): + return json.loads(load_text(file_path, encoding=encoding)) + + +def _find_global_config(prompty_path: Path = Path.cwd()) -> Union[Path, None]: + prompty_config = list(Path.cwd().glob("**/prompty.json")) + + if len(prompty_config) > 0: + return sorted( + [c for c in prompty_config if len(c.parent.parts) <= len(prompty_path.parts)], + key=lambda p: len(p.parts), + )[-1] + else: + return None + + +def load_global_config(prompty_path: Path = Path.cwd(), configuration: str = "default") -> Dict[str, Any]: + # prompty.config laying around? + config = _find_global_config(prompty_path) + + # if there is one load it + if config is not None: + c = load_json(config) + if configuration in c: + return c[configuration] + else: + raise ValueError(f'Item "{configuration}" not found in "{config}"') + + return {} + + +def load_prompty(file_path, encoding="utf-8") -> Dict[str, Any]: + contents = load_text(file_path, encoding=encoding) + return parse(contents) + + +def parse(contents): + global _yaml_regex + + fmatter = "" + body = "" + result = _yaml_regex.search(contents) + + if result: + fmatter = result.group(1) + body = result.group(2) + return { + "attributes": yaml.load(fmatter, Loader=yaml.SafeLoader), + "body": body, + "frontmatter": fmatter, + } diff --git a/sdk/ai/azure-ai-inference/dev_requirements.txt b/sdk/ai/azure-ai-inference/dev_requirements.txt index 9c82a165e327..b8f68ea98ffc 100644 --- a/sdk/ai/azure-ai-inference/dev_requirements.txt +++ b/sdk/ai/azure-ai-inference/dev_requirements.txt @@ -3,4 +3,5 @@ ../../core/azure-core-tracing-opentelemetry ../../monitor/azure-monitor-opentelemetry aiohttp -opentelemetry-sdk \ No newline at end of file +opentelemetry-sdk +types-pyyaml diff --git a/sdk/ai/azure-ai-inference/samples/README.md b/sdk/ai/azure-ai-inference/samples/README.md index b6704138e4b2..6054eaad5440 100644 --- a/sdk/ai/azure-ai-inference/samples/README.md +++ b/sdk/ai/azure-ai-inference/samples/README.md @@ -99,6 +99,8 @@ similarly for the other samples. |[sample_chat_completions_from_input_bytes.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_bytes.py) | One chat completion operation using a synchronous client, with input messages provided as `IO[bytes]`. | |[sample_chat_completions_from_input_json.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_json.py) | One chat completion operation using a synchronous client, with input messages provided as a dictionary (type `MutableMapping[str, Any]`) | |[sample_chat_completions_from_input_json_with_image_url.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_json_with_image_url.py) | One chat completion operation using a synchronous client, with input messages provided as a dictionary (type `MutableMapping[str, Any]`). Includes sending an input image URL. | +|[sample_chat_completions_from_input_prompt_string.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_prompt_string.py) | One chat completion operation using a synchronous client, with input message template in string format. | +|[sample_chat_completions_from_input_prompty.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_prompty.py) | One chat completion operation using a synchronous client, with the input in Prompty format from a Prompty file. Prompty website: https://prompty.ai | |[sample_chat_completions_with_tools.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/samples/sample_chat_completions_with_tools.py) | Shows how do use a tool (function) in chat completions, for an AI model that supports tools | |[sample_chat_completions_streaming_with_tools.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/samples/sample_chat_completions_streaming_with_tools.py) | Shows how do use a tool (function) in chat completions, with streaming response, for an AI model that supports tools | |[sample_load_client.py](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/samples/sample_load_client.py) | Shows how do use the function `load_client` to create the appropriate synchronous client based on the provided endpoint URL. In this example, it creates a synchronous `ChatCompletionsClient`. | diff --git a/sdk/ai/azure-ai-inference/samples/sample1.prompty b/sdk/ai/azure-ai-inference/samples/sample1.prompty new file mode 100644 index 000000000000..6dbcbf40bc6f --- /dev/null +++ b/sdk/ai/azure-ai-inference/samples/sample1.prompty @@ -0,0 +1,30 @@ +--- +name: Basic Prompt +description: A basic prompt that uses the GPT-3 chat API to answer questions +authors: + - author_1 + - author_2 +model: + api: chat + configuration: + azure_deployment: gpt-4o-mini + parameters: + temperature: 1 + frequency_penalty: 0.5 + presence_penalty: 0.5 +--- +system: +You are an AI assistant in a hotel. You help guests with their requests and provide information about the hotel and its services. + +# context +{{#rules}} +{{rule}} +{{/rules}} + +{{#chat_history}} +{{role}}: +{{content}} +{{/chat_history}} + +user: +{{input}} diff --git a/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_prompt_string.py b/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_prompt_string.py new file mode 100644 index 000000000000..29d02753c649 --- /dev/null +++ b/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_prompt_string.py @@ -0,0 +1,82 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +""" +DESCRIPTION: + This sample demonstrates how to get a chat completions response from + the service using a synchronous client, with input message template + in string format. + + This sample assumes the AI model is hosted on a Serverless API or + Managed Compute endpoint. For GitHub Models or Azure OpenAI endpoints, + the client constructor needs to be modified. See package documentation: + https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/README.md#key-concepts + +USAGE: + python sample_chat_completions_from_input_prompt_string.py + + Set these two environment variables before running the sample: + 1) AZURE_AI_CHAT_ENDPOINT - Your endpoint URL, in the form + https://..models.ai.azure.com + where `your-deployment-name` is your unique AI Model deployment name, and + `your-azure-region` is the Azure region where your model is deployed. + 2) AZURE_AI_CHAT_KEY - Your model key (a 32-character string). Keep it secret. +""" +# mypy: disable-error-code="union-attr,arg-type" +# pyright: reportAttributeAccessIssue=false + + +def sample_chat_completions_from_input_prompt_string(): + import os + from azure.ai.inference import ChatCompletionsClient + from azure.ai.inference.prompts import PromptTemplate + from azure.core.credentials import AzureKeyCredential + + try: + endpoint = os.environ["AZURE_AI_CHAT_ENDPOINT"] + key = os.environ["AZURE_AI_CHAT_KEY"] + except KeyError: + print("Missing environment variable 'AZURE_AI_CHAT_ENDPOINT' or 'AZURE_AI_CHAT_KEY'") + print("Set them before running this sample.") + exit() + + prompt_template_str = """ + system: + You are an AI assistant in a hotel. You help guests with their requests and provide information about the hotel and its services. + + # context + {{#rules}} + {{rule}} + {{/rules}} + + {{#chat_history}} + {{role}}: + {{content}} + {{/chat_history}} + + user: + {{input}} + """ + prompt_template = PromptTemplate.from_string(api="chat", prompt_template=prompt_template_str) + + input = "When I arrived, can I still have breakfast?" + rules = [ + {"rule": "The check-in time is 3pm"}, + {"rule": "The check-out time is 11am"}, + {"rule": "Breakfast is served from 7am to 10am"}, + ] + chat_history = [ + {"role": "user", "content": "I'll arrive at 2pm. What's the check-in and check-out time?"}, + {"role": "system", "content": "The check-in time is 3 PM, and the check-out time is 11 AM."}, + ] + messages = prompt_template.create_messages(input=input, rules=rules, chat_history=chat_history) + + client = ChatCompletionsClient(endpoint=endpoint, credential=AzureKeyCredential(key)) + response = client.complete(messages=messages) # type: ignore[reportCallIssue, reportArgumentType] + + print(response.choices[0].message.content) + + +if __name__ == "__main__": + sample_chat_completions_from_input_prompt_string() diff --git a/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_prompty.py b/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_prompty.py new file mode 100644 index 000000000000..bb5b671e894d --- /dev/null +++ b/sdk/ai/azure-ai-inference/samples/sample_chat_completions_from_input_prompty.py @@ -0,0 +1,71 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +""" +DESCRIPTION: + This sample demonstrates how to get a chat completions response from + the service using a synchronous client, with the input in Prompty format + from a Prompty file. Prompty website: https://prompty.ai + + This sample assumes the AI model is hosted on a Serverless API or + Managed Compute endpoint. For GitHub Models or Azure OpenAI endpoints, + the client constructor needs to be modified. See package documentation: + https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-inference/README.md#key-concepts + +USAGE: + python sample_chat_completions_from_input_prompty.py + + Set these two environment variables before running the sample: + 1) AZURE_AI_CHAT_ENDPOINT - Your endpoint URL, in the form + https://..models.ai.azure.com + where `your-deployment-name` is your unique AI Model deployment name, and + `your-azure-region` is the Azure region where your model is deployed. + 2) AZURE_AI_CHAT_KEY - Your model key (a 32-character string). Keep it secret. +""" +# mypy: disable-error-code="union-attr" +# pyright: reportAttributeAccessIssue=false + + +def sample_chat_completions_from_input_prompty(): + import os + from azure.ai.inference import ChatCompletionsClient + from azure.ai.inference.prompts import PromptTemplate + from azure.core.credentials import AzureKeyCredential + + try: + endpoint = os.environ["AZURE_AI_CHAT_ENDPOINT"] + key = os.environ["AZURE_AI_CHAT_KEY"] + except KeyError: + print("Missing environment variable 'AZURE_AI_CHAT_ENDPOINT' or 'AZURE_AI_CHAT_KEY'") + print("Set them before running this sample.") + exit() + + path = "./sample1.prompty" + prompt_template = PromptTemplate.from_prompty(file_path=path) + + input = "When I arrived, can I still have breakfast?" + rules = [ + {"rule": "The check-in time is 3pm"}, + {"rule": "The check-out time is 11am"}, + {"rule": "Breakfast is served from 7am to 10am"}, + ] + chat_history = [ + {"role": "user", "content": "I'll arrive at 2pm. What's the check-in and check-out time?"}, + {"role": "system", "content": "The check-in time is 3 PM, and the check-out time is 11 AM."}, + ] + messages = prompt_template.create_messages(input=input, rules=rules, chat_history=chat_history) + + client = ChatCompletionsClient(endpoint=endpoint, credential=AzureKeyCredential(key)) + + response = client.complete( + messages=messages, + model=prompt_template.model_name, + **prompt_template.parameters, + ) + + print(response.choices[0].message.content) + + +if __name__ == "__main__": + sample_chat_completions_from_input_prompty() diff --git a/sdk/ai/azure-ai-inference/setup.py b/sdk/ai/azure-ai-inference/setup.py index f6a2bea03eb4..7e30f3716b8f 100644 --- a/sdk/ai/azure-ai-inference/setup.py +++ b/sdk/ai/azure-ai-inference/setup.py @@ -62,11 +62,7 @@ package_data={ "azure.ai.inference": ["py.typed"], }, - install_requires=[ - "isodate>=0.6.1", - "azure-core>=1.30.0", - "typing-extensions>=4.6.0", - ], + install_requires=["isodate>=0.6.1", "azure-core>=1.30.0", "typing-extensions>=4.6.0"], python_requires=">=3.8", extras_require={"opentelemetry": ["azure-core-tracing-opentelemetry"]}, ) diff --git a/sdk/ai/azure-ai-inference/tests/sample1.prompty b/sdk/ai/azure-ai-inference/tests/sample1.prompty new file mode 100644 index 000000000000..6dbcbf40bc6f --- /dev/null +++ b/sdk/ai/azure-ai-inference/tests/sample1.prompty @@ -0,0 +1,30 @@ +--- +name: Basic Prompt +description: A basic prompt that uses the GPT-3 chat API to answer questions +authors: + - author_1 + - author_2 +model: + api: chat + configuration: + azure_deployment: gpt-4o-mini + parameters: + temperature: 1 + frequency_penalty: 0.5 + presence_penalty: 0.5 +--- +system: +You are an AI assistant in a hotel. You help guests with their requests and provide information about the hotel and its services. + +# context +{{#rules}} +{{rule}} +{{/rules}} + +{{#chat_history}} +{{role}}: +{{content}} +{{/chat_history}} + +user: +{{input}} diff --git a/sdk/ai/azure-ai-inference/tests/sample1_with_secrets.prompty b/sdk/ai/azure-ai-inference/tests/sample1_with_secrets.prompty new file mode 100644 index 000000000000..8451c02b942e --- /dev/null +++ b/sdk/ai/azure-ai-inference/tests/sample1_with_secrets.prompty @@ -0,0 +1,34 @@ +--- +name: Basic Prompt +description: A basic prompt that uses the GPT-3 chat API to answer questions +authors: + - author_1 + - author_2 +model: + api: chat + configuration: + azure_deployment: gpt-4o-mini + type: azure_openai + api_version: test_version + api_key: test_key + api_secret: test_secret + parameters: + temperature: 1 + frequency_penalty: 0.5 + presence_penalty: 0.5 +--- +system: +You are an AI assistant in a hotel. You help guests with their requests and provide information about the hotel and its services. + +# context +{{#rules}} +{{rule}} +{{/rules}} + +{{#chat_history}} +{{role}}: +{{content}} +{{/chat_history}} + +user: +{{input}} diff --git a/sdk/ai/azure-ai-inference/tests/test_prompts.py b/sdk/ai/azure-ai-inference/tests/test_prompts.py new file mode 100644 index 000000000000..73c5341729fa --- /dev/null +++ b/sdk/ai/azure-ai-inference/tests/test_prompts.py @@ -0,0 +1,96 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import os +from azure.ai.inference.prompts import PromptTemplate + + +class TestPrompts: + + # ********************************************************************************** + # + # UNIT TESTS + # + # ********************************************************************************** + + def test_prompt_template_from_prompty(self, **kwargs): + script_dir = os.path.dirname(os.path.abspath(__file__)) + prompty_file_path = os.path.join(script_dir, "sample1.prompty") + prompt_template = PromptTemplate.from_prompty(prompty_file_path) + assert prompt_template.model_name == "gpt-4o-mini" + assert prompt_template.parameters["temperature"] == 1 + assert prompt_template.parameters["frequency_penalty"] == 0.5 + assert prompt_template.parameters["presence_penalty"] == 0.5 + + input = "What's the check-in and check-out time?" + rules = [ + {"rule": "The check-in time is 3pm"}, + {"rule": "The check-out time is 11am"}, + {"rule": "Breakfast is served from 7am to 10am"}, + {"rule": 'The hotel website is https://www.myhotel.com?key1=param1&key2=param"2&key3=param<3>'}, + ] + messages = prompt_template.create_messages(input=input, rules=rules) + assert len(messages) == 2 + assert messages[0]["role"] == "system" + assert "Breakfast is served from 7am to 10am" in messages[0]["content"] + assert ( + "The hotel website is https://www.myhotel.com?key1=param1&key2=param"2&key3=param<3>" + in messages[0]["content"] + ) + assert messages[1]["role"] == "user" + assert messages[1]["content"] == "What's the check-in and check-out time?" + + def test_prompt_template_from_prompty_with_masked_secrets(self, **kwargs): + script_dir = os.path.dirname(os.path.abspath(__file__)) + prompty_file_path = os.path.join(script_dir, "sample1_with_secrets.prompty") + prompt_template = PromptTemplate.from_prompty(prompty_file_path) + assert prompt_template.prompty.model.configuration["api_key"] == "test_key" + assert prompt_template.prompty.model.configuration["api_secret"] == "test_secret" + telemetry_dict = prompt_template.prompty.to_safe_dict() + assert telemetry_dict["model"]["configuration"]["api_key"] == "********" + assert telemetry_dict["model"]["configuration"]["api_secret"] == "***********" + + def test_prompt_template_from_message(self, **kwargs): + prompt_template_str = "system prompt template text\nuser:\n{{input}}" + prompt_template = PromptTemplate.from_string(api="chat", prompt_template=prompt_template_str) + input = "user question input text" + messages = prompt_template.create_messages(input=input) + assert len(messages) == 1 + assert messages[0]["role"] == "system" + assert "system prompt template text\nuser:\nuser question input text" == messages[0]["content"] + + def test_prompt_template_from_message_with_tags(self, **kwargs): + prompt_template_str = """ + system: + You are an AI assistant in a hotel. You help guests with their requests and provide information about the hotel and its services. + + # context + {{#rules}} + {{rule}} + {{/rules}} + + {{#chat_history}} + {{role}}: + {{content}} + {{/chat_history}} + + user: + {{input}} + """ + prompt_template = PromptTemplate.from_string(api="chat", prompt_template=prompt_template_str) + input = "When I arrived, can I still have breakfast?" + rules = [ + {"rule": "The check-in time is 3pm"}, + {"rule": "The check-out time is 11am"}, + {"rule": "Breakfast is served from 7am to 10am"}, + ] + chat_history = [ + {"role": "user", "content": "I'll arrive at 2pm. What's the check-in and check-out time?"}, + {"role": "system", "content": "The check-in time is 3 PM, and the check-out time is 11 AM."}, + ] + messages = prompt_template.create_messages(input=input, rules=rules, chat_history=chat_history) + assert len(messages) == 1 + assert messages[0]["role"] == "system" + assert "You are an AI assistant in a hotel." in messages[0]["content"] + assert "When I arrived, can I still have breakfast?" in messages[0]["content"]