Skip to content

[Pipeline Refactor] Update routes, text generation initial functionality #1348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 3, 2023
1 change: 0 additions & 1 deletion src/deepsparse/v2/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .operator import *
18 changes: 9 additions & 9 deletions src/deepsparse/v2/operators/engine_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

from pydantic import BaseModel, Field

from deepsparse import Context, Engine, MultiModelEngine, Scheduler
from deepsparse import Context as EngineContext
from deepsparse import Engine, MultiModelEngine, Scheduler
from deepsparse.benchmark import ORTEngine
from deepsparse.utils import join_engine_outputs, model_to_path, split_engine_inputs
from deepsparse.v2.operators import Operator
Expand Down Expand Up @@ -54,16 +55,15 @@ def __init__(
self,
model_path: str,
engine_type: str = DEEPSPARSE_ENGINE,
batch_size: Optional[int] = 1,
num_cores: int = None,
num_streams: int = None,
scheduler: Scheduler = None,
input_shapes: List[List[int]] = None,
engine_context: Optional[Context] = None,
engine_context: Optional[EngineContext] = None,
engine_kwargs: Dict = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
engine_kwargs: Dict = None,
engine_kwargs: Optional[Dict] = None,

):

self._batch_size = batch_size
self.model_path = model_to_path(model_path)
self._batch_size = 1
self.engine_context = engine_context

if self.engine_context is not None:
Expand All @@ -87,7 +87,7 @@ def __init__(
self._engine_args = engine_args
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have engine_args, and engine_kwargs, but both are dicts; aren't the names slightly misleading?

self._engine_type = engine_type

self.engine = self.create_engine()
self.engine = self.create_engine(**engine_kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we are passing kwargs to create_engine, which also accepts **kwargs; there is no clear way to figure out what arguments should be passed to create_engine without looking at the implementation?


@property
def batch_size(self) -> int:
Expand All @@ -114,12 +114,12 @@ def create_engine(

if engine_type == DEEPSPARSE_ENGINE:
if self.engine_context is not None and isinstance(
self.engine_context, Context
self.engine_context, EngineContext
):
engine_args.pop("num_cores", None)
engine_args.pop("scheduler", None)
engine_args.pop("num_streams", None)
engine_args["context"] = self.engien_context
engine_args["context"] = self.engine_context
return MultiModelEngine(
model=onnx_file_path,
**engine_args,
Expand All @@ -135,7 +135,7 @@ def create_engine(
f"{SUPPORTED_PIPELINE_ENGINES}"
)

def run(self, inp: EngineOperatorInputs) -> Dict:
def run(self, inp: EngineOperatorInputs, **kwargs) -> Dict:
if inp.engine:
# run with custom engine, do not split/join since custom engine
# may run at any batch size, returning here as code below has a
Expand Down
30 changes: 24 additions & 6 deletions src/deepsparse/v2/operators/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from pydantic import BaseModel

from deepsparse.v2.utils import InferenceState, PipelineState


__all__ = ["Operator"]

Expand Down Expand Up @@ -54,14 +56,18 @@ def has_output_schema(cls) -> bool:
def __call__(
self,
*args,
inference_state: InferenceState,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using \ operator introduced in python3.8 https://peps.python.org/pep-0570/#syntax-and-semantics

pipeline_state: PipelineState,
**kwargs,
) -> Any:
"""
Parses inputs to this Operator and runs the run() method of this operator

:param args: an unnamed arg may only be provided if it is of the type of the
input_schema
:param context: pipeline context to pass to operator
:param inference_state: inference_state for the pipeline.
:param pipeline_state: pipeline_state for the pipeline. The values in the state
are created during pipeline creation and are read-only during inference.
:param kwargs: kwargs when not initializing from an instantiated schema
:return: operator output
"""
Expand All @@ -81,10 +87,18 @@ def __call__(
"in the form of a dictionary or an instance of the input_schema"
"object"
)

run_output = self.run(inference_input)
run_output = self.run(
inference_input,
inference_state=inference_state,
pipeline_state=pipeline_state,
)
else:
run_output = self.run(*args, **kwargs)
run_output = self.run(
*args,
inference_state=inference_state,
pipeline_state=pipeline_state,
**kwargs,
)

if self.has_output_schema():
return self.output_schema(**run_output)
Expand All @@ -93,12 +107,16 @@ def __call__(
@abstractmethod
def run(self, *args, **kwargs) -> Any:
"""
:param inp: operator input, as the defined input schema if applicable
:param context: pipeline context of already run operators
:return: result of this operator as the defined output schema if applicable
"""
raise NotImplementedError

def can_operate(self, inp: Any) -> bool:
"""
Whether or not the given operator can run, based on input
"""
return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this always be True? Also missing return in docstring


def expand_inputs(self, **kwargs):
"""
Generic function to handle expanding values.
Expand Down
70 changes: 54 additions & 16 deletions src/deepsparse/v2/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from deepsparse.v2.operators import Operator
from deepsparse.v2.routers import Router
from deepsparse.v2.schedulers import OperatorScheduler, SchedulerGroup
from deepsparse.v2.utils import InferenceState, PipelineState


__all__ = ["Pipeline"]
Expand All @@ -27,7 +28,7 @@ class Pipeline(Operator):
"""
Pipeline accepts a series of operators, schedulers, and a router. Calling a pipeline
will use the router to run through all the defined operators. The operators should
be implemented using the Operator class and each implemented Operator should be
be implemented using the Operator class and each implemented operator should be
responsible for a functional component of the pipelines. The flow of inputs/outputs
between the operators and the steps in the pipeline should be defined by the router,
(based off of the Router class), which dicates the next operator in the pipeline.
Expand All @@ -37,6 +38,7 @@ class Pipeline(Operator):
or dictionary of operators.
:param router: A Router which dictates the next operator to call.
:param schedulers: A list of schedulers to run operators.
:param pipeline_state: pipeline_state created during pipeline initialization

"""

Expand All @@ -45,57 +47,93 @@ def __init__(
ops: Union[Dict[str, Operator], List[Operator]],
router: Router,
schedulers: List[OperatorScheduler],
pipeline_state: PipelineState = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pipeline_state: PipelineState = None,
pipeline_state: Optional[PipelineState] = None,

):

self.ops = ops
self.router = router
self.schedulers = schedulers
self.pipeline_state = pipeline_state
self.validate()

# SchedulerGroup handles running all schedulers in order of priority
self._scheduler_group = SchedulerGroup(self.schedulers)

def run(self, *args, **kwargs):
def run(
self,
*args,
inference_state: InferenceState,
pipeline_state: PipelineState,
**kwargs,
):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing return type

"""
Run through the operators using the provided router and scheduler. Update the
context to reflect each step of the router. The input to a given operator is the
output of the previous operator.

:param inp: input to the operator. expected to be of any type that is
expected by the operator.
:param context: context to store the current the inputs, outputs, and operator
for each step of the router.
Run through the operators using the provided router and scheduler.
The input to a given operator is the output of the previous operator.

:param inference_state: inference_state for the pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we describe args/kwargs?

:param pipeline_state: pipeline_state for the pipeline. The values in the state
are created during pipeline creation and are read-only during inference.
"""
next_step = self.router.START_ROUTE
operator_output = None

while next_step != self.router.END_ROUTE:
# Either a dictionary key or valid index
operator = self.ops[next_step]
if next_step == self.router.START_ROUTE:
output_future = self._scheduler_group.submit(
*args, operator=operator, **kwargs
*args,
inference_state=inference_state,
operator=operator,
pipeline_state=pipeline_state,
**kwargs,
)
else:
if isinstance(operator_output, dict):
output_future = self._scheduler_group.submit(
operator=operator, **operator_output
inference_state=inference_state,
operator=operator,
pipeline_state=pipeline_state,
**operator_output,
)
else:
output_future = self._scheduler_group.submit(
operator_output, operator=operator
operator_output,
inference_state=inference_state,
pipeline_state=pipeline_state,
operator=operator,
)

# wait for future to resolve
operator_output = output_future.result()
next_step = self.router.next(next_step, self.ops)
if isinstance(operator_output, tuple):
state_update = operator_output[-1]
operator_output = operator_output[0]
inference_state.update_state(state_update)

next_step = self.router.next(next_step, self.ops, operator_output)

return operator_output

def __call__(self, *args, **kwargs):
"""
Consolidate any provided inference_state or pipeline_state objects and pass
any other operator inputs to run().

:return: output of the pipeline operators ran with the router for the given
input
input
"""
if kwargs.get("inference_state"):
inference_state = kwargs.pop("inference_state")
else:
inference_state = InferenceState()
inference_state.create_state({})

if "pipeline_state" in kwargs:
self.pipeline_state = kwargs.get("pipeline_state")

kwargs["inference_state"] = inference_state
kwargs["pipeline_state"] = self.pipeline_state

return self.run(*args, **kwargs)

def validate(self):
Expand Down
57 changes: 51 additions & 6 deletions src/deepsparse/v2/routers/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@

import logging
from abc import abstractmethod
from typing import Dict, List, Union
from typing import Any, Dict, List, Optional, Union

from deepsparse.v2.operators import Operator


_LOGGER = logging.getLogger(__name__)

__all__ = ["Router", "LinearRouter"]
__all__ = ["Router", "LinearRouter", "GraphRouter"]


class Router:
Expand All @@ -32,23 +32,34 @@ class Router:

:param start_route: the start index or key of the router
:param end_route: the end index or key of the router
:param route: the route that the router has to traverse through

"""

def __init__(self, end_route: Union[str, int], start_route: Union[str, int]):
def __init__(
self,
end_route: Union[str, int],
start_route: Union[str, int],
route: Optional[Dict] = None,
):
self.START_ROUTE = start_route
self.END_ROUTE = end_route
self.route = route

@abstractmethod
def next(
self, past: Union[str, int], ops: Union[List[Operator], Dict[str, Operator]]
self,
past: Union[str, int],
ops: Optional[Union[List[Operator], Dict[str, Operator]]],
inp: Optional[Any],
) -> Union[str, int]:
"""
Determines the index or dictionary key for the next operator which should run.

:param past: the previous index or key. This should uniquely determine the next
operator to run
operator to run
:param ops: list or dictionary of operators
:param inp: operator input
:returns: the next index or dictionary key for the next operator to run
"""
raise NotImplementedError
Expand All @@ -69,7 +80,9 @@ class LinearRouter(Router):
def __init__(self, end_route: int, start_route: int = 0):
super().__init__(end_route=end_route, start_route=start_route)

def next(self, past: int, ops: List[Operator]) -> int:
def next(
self, past: int, ops: Optional[List[Operator]] = None, inp: Optional[Any] = None
) -> int:
new_index = past + 1
if new_index < self.END_ROUTE:
return new_index
Expand Down Expand Up @@ -105,3 +118,35 @@ def validate(operators: List[Operator]) -> bool:
)
return False
return True


class GraphRouter(Router):
"""
Router for a DAG. Expects graphs be presented in the form of a dictionary, where
keys are the nodes of the graph and the values are the connected nodes. For
nodes with multiple ouput edges, all the nodes will be visited and the first node
where `can_operate` returns True will run. Paths should be deterministic.
"""

def __init__(self, end_route: str, start_route: str, route: Dict):
super().__init__(end_route=end_route, start_route=start_route, route=route)

def next(
self,
past: str,
ops: Dict[str, Operator],
inp: Any,
) -> int:
node = past
if isinstance(self.route[node], str):
return self.route[node]
else:
for neighbour_node in self.route[node]:
neighbour_node_op = ops[neighbour_node]
if neighbour_node_op.can_operate(inp):
return neighbour_node
raise ValueError("Cannot operate on any of the nodes")

@staticmethod
def validate(ops) -> bool:
pass
Loading