Skip to content

Workflow management python SDK #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 89 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 85 commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
d3ac21a
Initial draft for python SDK
RyanLettieri Apr 18, 2023
575bba0
Merge branch 'master' of https://github.com/dapr/python-sdk into work…
RyanLettieri Apr 18, 2023
9acdf2c
Adding workflow code to aio client
RyanLettieri Apr 18, 2023
3e23139
Fixing protos and small fixes to workflow imports
RyanLettieri Apr 18, 2023
c4e9ef7
Workflow Authoring
DeepanshuA Apr 25, 2023
487e97f
Add example
DeepanshuA Apr 25, 2023
0733570
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA Apr 25, 2023
26380ee
lint
DeepanshuA Apr 25, 2023
eb995b8
is it wheel fix
DeepanshuA Apr 25, 2023
cd044d1
fix lint
DeepanshuA Apr 25, 2023
6fde40c
Updating proto and more small fixes to workflow
RyanLettieri Apr 27, 2023
128a1fa
Add tests and client APIs
DeepanshuA Apr 27, 2023
e074e58
Updating proto and merging in other changes
RyanLettieri Apr 27, 2023
2682d72
Updating proto
RyanLettieri Apr 27, 2023
9361b30
lint
DeepanshuA Apr 27, 2023
49c78d7
Removing super init
RyanLettieri Apr 28, 2023
0516400
tox fix
RyanLettieri Apr 28, 2023
f82bb8e
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA Apr 28, 2023
ffde1df
Add dtf python dependency
DeepanshuA Apr 28, 2023
8d62488
correction
DeepanshuA Apr 28, 2023
28690f0
Some implementation of workflow into fake server and other cleanup
RyanLettieri Apr 30, 2023
3015073
Fixing input for workflow
RyanLettieri Apr 30, 2023
de23696
More workflow cleanup
RyanLettieri Apr 30, 2023
15938b4
Fixing up workflow options to be optional
RyanLettieri May 1, 2023
19ea43f
Few more updates to workflow
RyanLettieri May 2, 2023
40fe904
Few more updates to workflow merging
RyanLettieri May 2, 2023
1be2447
Remove get-pip.py
DeepanshuA May 2, 2023
ab2d201
Updating examples test for workflow and cleaning up methods
RyanLettieri May 2, 2023
a537862
Update durabletask dependency version
DeepanshuA May 2, 2023
75ae2bf
Extra line - to be deleted
DeepanshuA May 3, 2023
7a31c66
test compatible with 3.7
DeepanshuA May 3, 2023
3c11d6c
Fixing comment
RyanLettieri May 3, 2023
f5c7796
Merge branch 'master' of https://github.com/dapr/python-sdk into work…
RyanLettieri May 3, 2023
325dedb
Merge branch 'master' into workflow_authoring
yaron2 May 4, 2023
b8bca56
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA May 9, 2023
2f81812
Incorporate review comments
DeepanshuA May 10, 2023
3185586
Merge branch 'workflow_authoring' of https://github.com/DeepanshuA/py…
DeepanshuA May 10, 2023
2ff152e
lint
DeepanshuA May 11, 2023
347c0bf
Ut fix
DeepanshuA May 11, 2023
2ebf20c
validate demo_workflow
DeepanshuA May 11, 2023
a8fa501
App Readme
DeepanshuA May 11, 2023
05194c8
fix step md
DeepanshuA May 11, 2023
b2aff5c
Adding in exceptions to workflow methods
RyanLettieri May 11, 2023
cc25bd7
Encoding workflow start data and fixing test
RyanLettieri May 11, 2023
1a8af1d
Cleaning up some workflow methods
RyanLettieri May 11, 2023
7598dd1
Validate demo workflow example
DeepanshuA May 12, 2023
74dbc9f
Remove demo actor temporarily
DeepanshuA May 12, 2023
bf69dea
Include raise event test and assertions
DeepanshuA May 16, 2023
0968b8e
Rename
DeepanshuA May 16, 2023
94f915b
Adressing some workflow comments
RyanLettieri May 17, 2023
4886c9e
Incorporate Review comments
DeepanshuA May 18, 2023
18bc883
Lint, validate
DeepanshuA May 18, 2023
9d8ec7f
test correction
DeepanshuA May 18, 2023
49f960a
Fake class method correction
DeepanshuA May 18, 2023
c64a091
Check expected std output in validate example
DeepanshuA May 18, 2023
f3207fc
Remove extra port check
DeepanshuA May 19, 2023
2bfc44a
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA May 19, 2023
82b33e5
Temporary - Verify Workflow Example first
DeepanshuA May 19, 2023
de33a9b
Requirements
DeepanshuA May 19, 2023
363a0df
Remove line
DeepanshuA May 19, 2023
ba2087a
Add back removed validate examples
DeepanshuA May 19, 2023
a3fb750
Update examples/demo_workflow/demo_workflow/requirements.txt
berndverst May 22, 2023
135d3d7
Change running order of wf
DeepanshuA May 22, 2023
a9be483
Commit to re-run example
DeepanshuA May 22, 2023
0e0203d
Merge branch 'master' of https://github.com/dapr/python-sdk into work…
RyanLettieri May 22, 2023
003e25b
Merging in authoring for workflow
RyanLettieri May 23, 2023
b9865b3
Merging in master
RyanLettieri May 23, 2023
0a0e94b
Addressing some review comments
RyanLettieri May 23, 2023
23ae6f7
Few updates for workflow PR
RyanLettieri May 24, 2023
9cc7e4a
More workflow PR fixes
RyanLettieri May 24, 2023
30ab277
Merge branch 'master' into workflow-sdk
RyanLettieri May 24, 2023
da401e8
More workflow typing fixes
RyanLettieri May 25, 2023
8b7dec9
Merge branch 'workflow-sdk' of https://github.com/RyanLettieri/python…
RyanLettieri May 25, 2023
e1d050c
Test and aio client fixes for workflow
RyanLettieri May 25, 2023
d7e9ebd
Whitespace fix
RyanLettieri May 25, 2023
9f5f138
Making workflow test more deterministic
RyanLettieri May 25, 2023
d08907e
Fixing broken workflow test
RyanLettieri May 26, 2023
722e5b3
Addressing some workflow comments
RyanLettieri May 26, 2023
57af017
linting
RyanLettieri May 26, 2023
40da262
Addressing more comments
RyanLettieri May 29, 2023
4c985f0
Even more fixes to workflow
RyanLettieri May 29, 2023
a4769d5
Fixing workflow example
RyanLettieri May 29, 2023
cbed6a2
Making workflow example more consitent
RyanLettieri May 29, 2023
739f145
Making workflow example more consitent again
RyanLettieri May 29, 2023
6932f97
Merging in master
RyanLettieri May 30, 2023
9c6ae6f
Fixing serialization on workflow
RyanLettieri May 30, 2023
efd8420
Code formatting for workflow
RyanLettieri May 30, 2023
ff513f2
Updating workflow documentation
RyanLettieri May 30, 2023
04be191
Fixing input to start workflow
RyanLettieri May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 280 additions & 1 deletion dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import asyncio
import time
import socket
import json
import uuid

from datetime import datetime
from urllib.parse import urlencode

from warnings import warn

from typing import Callable, Dict, Optional, Text, Union, Sequence, List
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any
from typing_extensions import Self

from google.protobuf.message import Message as GrpcMessage
Expand All @@ -35,7 +38,9 @@
StreamStreamClientInterceptor
)

from dapr.clients.exceptions import DaprInternalError
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
Expand Down Expand Up @@ -71,6 +76,8 @@
ConfigurationWatcher,
TryLockResponse,
UnlockResponse,
GetWorkflowResponse,
StartWorkflowResponse,
)


Expand Down Expand Up @@ -1103,6 +1110,278 @@ async def unlock(
return UnlockResponse(status=UnlockResponseStatus(response.status),
headers=await call.initial_metadata())

async def start_workflow(
self,
workflow_component: str,
workflow_name: str,
input: Union[Any, bytes, None] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
input: Union[Any, bytes, None] = None,
input: Optional[Union[Any, bytes]] = None,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs updating

instance_id: Optional[str] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add parameter send_raw_bytes: bool = False

workflow_options: Optional[Dict[str, str]] = dict()) -> StartWorkflowResponse:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are assigning a default value of dict() anyway, then you don't need this to be Optional - only use Optional when you want to set it to None by default or you want to explicitly support None.

"""Starts a workflow.

Args:
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
workflow_name (str): the name of the workflow that will be executed.
input (Union[Any, bytes, None]): the input that the workflow will receive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update

instance_id (Optional[str]): the name of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_options (Optional[Dict[str, str]]): the key-value options
that the workflow will receive.

Returns:
:class:`StartWorkflowResponse`: Instance ID associated with the started workflow
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name)

if instance_id is None:
instance_id = str(uuid.uuid4())

if isinstance(input, bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(input, bytes):
if send_raw_bytes and isinstance(input, bytes):

encoded_data = input
else:
encoded_data = json.dumps(input).encode("utf-8") if input is not None else bytes([])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
encoded_data = json.dumps(input).encode("utf-8") if input is not None else bytes([])
try:
encoded_data = json.dumps(input).encode("utf-8") if input is not None else bytes([])
except TypeError:
raise DaprInternalError(message='start_workflow input must be json serializable')

Add try except around this step

In the except block, please throw a DaprInteralError exception with a message indicating that the input value must be json serializable.


# Actual start workflow invocation
req = api_v1.StartWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name,
options=workflow_options,
input=encoded_data)

try:
response = self._stub.StartWorkflowAlpha1(req)
return StartWorkflowResponse(instance_id=response.instance_id)
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def get_workflow(
self,
instance_id: str,
workflow_component: str) -> GetWorkflowResponse:
"""Gets information on a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`GetWorkflowResponse`: Instance ID associated with the started workflow
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual get workflow invocation
req = api_v1.GetWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
resp = self._stub.GetWorkflowAlpha1(req)
if resp.created_at is None:
resp.created_at = datetime.now
if resp.last_updated_at is None:
resp.last_updated_at = datetime.now
return GetWorkflowResponse(instance_id=instance_id,
workflow_name=resp.workflow_name,
created_at=resp.created_at,
last_updated_at=resp.last_updated_at,
runtime_status=getWorkflowRuntimeStatus(resp.runtime_status),
properties=resp.properties)
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def terminate_workflow(
self,
instance_id: str,
workflow_component: str) -> DaprResponse:
"""Terminates a workflow.

Args:
instance_id (str): the ID of the workflow instance, e.g.
`order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee

"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual terminate workflow invocation
req = api_v1.TerminateWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
_, call = self._stub.TerminateWorkflowAlpha1.with_call(req)
return DaprResponse(
headers=call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def raise_workflow_event(
self,
instance_id: str,
workflow_component: str,
event_name: str,
event_data: Union[Any, bytes, None] = None) -> DaprResponse:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_data: Union[Any, bytes, None] = None) -> DaprResponse:
event_data: Optional[Union[Any, bytes]] = None) -> DaprResponse:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add param send_raw_bytes: bool = False

"""Raises an event on a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
event_name (str): the name of the event to be raised on
the workflow.
event_data (Union[Any, bytes, None]): the input to the event.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component,
event_name=event_name)
if isinstance(input, bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(input, bytes):
if `send_raw_data and isinstance(input, bytes):`

encoded_data = event_data
else:
if event_data is not None:
encoded_data = json.dumps(event_data).encode("utf-8")
else:
encoded_data = bytes([])
# Actual workflow raise event invocation
req = api_v1.raise_workflow_event(
instance_id=instance_id,
workflow_component=workflow_component,
event_name=event_name,
event_data=encoded_data)

try:
_, call = self._stub.RaiseEventWorkflowAlpha1.with_call(req)
return DaprResponse(
headers=call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def pause_workflow(
self,
instance_id: str,
workflow_component: str) -> DaprResponse:
"""Pause a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee

"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual pause workflow invocation
req = api_v1.PauseWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
_, call = self._stub.PauseWorkflowAlpha1.with_call(req)

return DaprResponse(
headers=call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def resume_workflow(
self,
instance_id: str,
workflow_component: str) -> DaprResponse:
"""Resumes a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual resume workflow invocation
req = api_v1.ResumeWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
_, call = self._stub.ResumeWorkflowAlpha1.with_call(req)

return DaprResponse(
headers=call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def purge_workflow(
self,
instance_id: str,
workflow_component: str) -> DaprResponse:
"""Purges a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
"""
# Warnings and input validation
warn('The Workflow API is an Alpha version and is subject to change.',
UserWarning, stacklevel=2)
validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component)
# Actual purge workflow invocation
req = api_v1.PurgeWorkflowRequest(
instance_id=instance_id,
workflow_component=workflow_component)

try:
_, call = self._stub.PurgeWorkflowAlpha1.with_call(req)

return DaprResponse(
headers=call.initial_metadata())

except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def wait(self, timeout_s: float):
"""Waits for sidecar to be available within the timeout.

Expand Down
20 changes: 19 additions & 1 deletion dapr/clients/grpc/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from collections import namedtuple
from typing import Dict, List, Union, Tuple, Optional

from enum import Enum
from google.protobuf.any_pb2 import Any as GrpcAny
from google.protobuf.message import Message as GrpcMessage
from grpc import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore
Expand Down Expand Up @@ -180,3 +180,21 @@ def validateNotBlankString(**kwargs: Optional[str]):
for field_name, value in kwargs.items():
if not value or not value.strip():
raise ValueError(f"{field_name} name cannot be empty or blank")


class WorkflowRuntimeStatus(Enum):
UNKNOWN = "Unknown"
RUNNING = "Running"
COMPLETED = "Completed"
FAILED = "Failed"
TERMINATED = "Terminated"
PENDING = "Pending"
SUSPENDED = "Suspended"


# Will return the enum entry if it is present, otherwise returns "unknown"
def getWorkflowRuntimeStatus(inputString):
try:
return WorkflowRuntimeStatus[inputString].value
except KeyError:
return WorkflowRuntimeStatus.UNKNOWN
Loading