Skip to content

Workflow management python SDK #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 89 commits into from
May 31, 2023
Merged

Conversation

RyanLettieri
Copy link
Contributor

@RyanLettieri RyanLettieri commented Apr 18, 2023

Description

This PR covers the implementation of the management portion for workflow.

Note that this PR will fail until the following PRs are submitted:
dapr/dapr#6163
dapr/components-contrib#2729

Issue reference

Please reference the issue this PR will close: #542

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • Code compiles correctly
  • Created/updated tests
  • Extended the documentation

Signed-off-by: Ryan Lettieri <[email protected]>
Copy link
Member

@berndverst berndverst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the async implementation in dapr.aio.clients.grpc.client.py and update the tests in tests/clients/test_dapr_async_grpc_client.py

After that it would be awesome if for a change we update your example to use async instead of synchronous.

from dapr.aio.clients import DaprClient
async with DaprClient() as d:
  await d.wait()
  await d.get_workflow()
  print("stuff")

etc -- you get the idea

Copy link
Member

@berndverst berndverst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As also discussed in chat - you must use grpcio-tools==1.48.2 to generate the protos.

You used version 1.49.1 of grpcio-tools which generates protos not compatible with our version of protobuf. Please make sure to use grpcio-tools<1.49 and regenerate the protos.

Signed-off-by: Deepanshu Agarwal <[email protected]>
Signed-off-by: Deepanshu Agarwal <[email protected]>
Signed-off-by: Deepanshu Agarwal <[email protected]>
Signed-off-by: Deepanshu Agarwal <[email protected]>
Signed-off-by: Deepanshu Agarwal <[email protected]>
@RyanLettieri RyanLettieri marked this pull request as ready for review April 25, 2023 17:19
@RyanLettieri RyanLettieri requested review from a team as code owners April 25, 2023 17:19
RyanLettieri and others added 5 commits April 27, 2023 09:15
Signed-off-by: Deepanshu Agarwal <[email protected]>
Signed-off-by: Ryan Lettieri <[email protected]>
Signed-off-by: Deepanshu Agarwal <[email protected]>
@berndverst
Copy link
Member

dapr/clients/grpc/_response.py:963: error: Too many arguments for "__init__" of "object"  [call-arg]
dapr/clients/grpc/_response.py:989: error: Too many arguments for "__init__" of "object"  [call-arg]

workflow_component=workflow_component)
_, call = self._stub.TerminateWorkflowAlpha1.with_call(req)

return DaprResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a super generic response - how does someone know whether the termination request was received properly?

If there something in the protos that provides a better response?

cgillum
cgillum previously approved these changes May 30, 2023
Copy link
Contributor

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@berndverst berndverst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python SDK always uses Union[bytes, str] for any value which is bytes in the proto.

We should maintain this.

So if the data is str you probably need to convert it to bytes first. The bytes and None cases are otherwise handled correctly.

It is better to let a user manually define how they wish to convert any other object to bytes or string themselves, rather than doing this for the user. This will reduce a lot of support burden as users will be confused how another object type was encoded and then be unable to decode it.

@cgillum
Copy link
Contributor

cgillum commented May 31, 2023

@berndverst please reconsider regarding support for Any and implicit serialization for the workflow management APIs.

The workflow authoring SDKs already support implicit deserialization of JSON data. In fact, they do not support reading the binary data directly, in spite of the fact that the proto contract defines these payloads as binary. Disallowing implicit serialization in these APIs would mean that every developer would have to write JSON serialization code in their client apps, which is a bunch of boilerplate that they might get wrong and file issues for. The Dapr Client SDKs will be much more usable if we can take care of the serialization for them equally on both sides.

FWIW, we do the implicit client-side serialization in the .NET Dapr SDK already. Having the same behavior in the Python SDK would be nice for consistency.

Copy link
Member

@berndverst berndverst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a better idea:

If you replace Any with Dict then you still have convenience for the user, but you don't support completely arbitrary type conversions (which is a bad idea).

Please update the signature to Optional[Union[Dict, bytes, str]]

I believe other than replacing Any for Dict, and changing this signature (and doc strings) you might not need to make any other code changes here.

if instance_id is None:
instance_id = str(uuid.uuid4())

if isinstance(input, bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(input, bytes):
if send_raw_bytes and isinstance(input, bytes):

@berndverst berndverst dismissed their stale review May 31, 2023 20:07

We decided to go a different direction.

workflow_component: str,
workflow_name: str,
input: Union[Any, bytes, None] = None,
instance_id: Optional[str] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add parameter send_raw_bytes: bool = False

if isinstance(input, bytes):
encoded_data = input
else:
encoded_data = json.dumps(input).encode("utf-8") if input is not None else bytes([])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
encoded_data = json.dumps(input).encode("utf-8") if input is not None else bytes([])
try:
encoded_data = json.dumps(input).encode("utf-8") if input is not None else bytes([])
except TypeError:
raise DaprInternalError(message='start_workflow input must be json serializable')

Add try except around this step

In the except block, please throw a DaprInteralError exception with a message indicating that the input value must be json serializable.

instance_id: str,
workflow_component: str,
event_name: str,
event_data: Union[Any, bytes, None] = None) -> DaprResponse:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_data: Union[Any, bytes, None] = None) -> DaprResponse:
event_data: Optional[Union[Any, bytes]] = None) -> DaprResponse:

self,
workflow_component: str,
workflow_name: str,
input: Union[Any, bytes, None] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
input: Union[Any, bytes, None] = None,
input: Optional[Union[Any, bytes]] = None,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs updating

instance_id: str,
workflow_component: str,
event_name: str,
event_data: Union[Any, bytes, None] = None) -> DaprResponse:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add param send_raw_bytes: bool = False

validateNotBlankString(instance_id=instance_id,
workflow_component=workflow_component,
event_name=event_name)
if isinstance(input, bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(input, bytes):
if `send_raw_data and isinstance(input, bytes):`

instance_id: str,
workflow_component: str,
event_name: str,
event_data: Union[Any, bytes, None] = None) -> DaprResponse:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_data: Union[Any, bytes, None] = None) -> DaprResponse:
event_data: Optional[Union[Any, bytes]] = None) -> DaprResponse:

if isinstance(input, bytes):
encoded_data = event_data
else:
if event_data is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combine the following lines into the single line statement

Add try / except block as before

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encoded_data = json.dumps(input).encode("utf-8") if input is not None else bytes([])

instance_id: str,
workflow_component: str,
event_name: str,
event_data: Union[Any, bytes, None] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_data: Union[Any, bytes, None] = None,
event_data: Optional[Union[Any, bytes]] = None,

self,
workflow_component: str,
workflow_name: str,
input: Union[Any, bytes, None] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
input: Union[Any, bytes, None] = None,
input: Optional[Union[Any, bytes]] = None,

instance_id: str,
workflow_component: str,
event_name: str,
event_data: Union[Any, bytes, None] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_data: Union[Any, bytes, None] = None,
event_data: Optional[Union[Any, bytes]] = None,

else:
event_data = bytes([])
except TypeError:
raise DaprInternalError("Json Data is not serializable")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise DaprInternalError("Json Data is not serializable")
raise DaprInternalError("raise_workflow_event: event_data must be JSON serializable")

except TypeError:
raise DaprInternalError("Json Data is not serializable")
except ValueError as e:
raise DaprInternalError(f"Json serialization error: {e}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise DaprInternalError(f"Json serialization error: {e}")
raise DaprInternalError(f"raise_workflow_event: JSON serialization error: {e}")

else:
encoded_data = bytes([])
except TypeError:
raise DaprInternalError("Json Data is not serializable")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise DaprInternalError("Json Data is not serializable")
raise DaprInternalError("start_workflow: input must be JSON serializable")

except TypeError:
raise DaprInternalError("Json Data is not serializable")
except ValueError as e:
raise DaprInternalError(f"Json serialization error: {e}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise DaprInternalError(f"Json serialization error: {e}")
raise DaprInternalError(f"start_workflow: JSON serialization error: {e}")

cgillum
cgillum previously approved these changes May 31, 2023
Copy link
Contributor

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
workflow_name (str): the name of the workflow that will be executed.
input (Union[Any, bytes, None]): the input that the workflow will receive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update

@berndverst berndverst merged commit 7f181f3 into dapr:master May 31, 2023
elena-kolevska pushed a commit to elena-kolevska/python-sdk that referenced this pull request Sep 4, 2023
* Initial draft for python SDK

Signed-off-by: Ryan Lettieri <[email protected]>

* Adding workflow code to aio client

Signed-off-by: Ryan Lettieri <[email protected]>

* Fixing protos and small fixes to workflow imports

Signed-off-by: Ryan Lettieri <[email protected]>

* Workflow Authoring

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Add example

Signed-off-by: Deepanshu Agarwal <[email protected]>

* lint

Signed-off-by: Deepanshu Agarwal <[email protected]>

* is it wheel fix

Signed-off-by: Deepanshu Agarwal <[email protected]>

* fix lint

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Updating proto and more small fixes to workflow

Signed-off-by: Ryan Lettieri <[email protected]>

* Add tests and client APIs

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Updating proto

Signed-off-by: Ryan Lettieri <[email protected]>

* lint

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Removing super init

Signed-off-by: Ryan Lettieri <[email protected]>

* tox fix

Signed-off-by: Ryan Lettieri <[email protected]>

* Add dtf python dependency

Signed-off-by: Deepanshu Agarwal <[email protected]>

* correction

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Some implementation of workflow into fake server and other cleanup

Signed-off-by: Ryan Lettieri <[email protected]>

* Fixing input for workflow

Signed-off-by: Ryan Lettieri <[email protected]>

* More workflow cleanup

Signed-off-by: Ryan Lettieri <[email protected]>

* Fixing up workflow options to be optional

Signed-off-by: Ryan Lettieri <[email protected]>

* Few more updates to workflow

Signed-off-by: Ryan Lettieri <[email protected]>

* Remove get-pip.py

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Updating examples test for workflow and cleaning up methods

Signed-off-by: Ryan Lettieri <[email protected]>

* Update durabletask dependency version

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Extra line - to be deleted

Signed-off-by: Deepanshu Agarwal <[email protected]>

* test compatible with 3.7

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Fixing comment

Signed-off-by: Ryan Lettieri <[email protected]>

* Incorporate review comments

Signed-off-by: Deepanshu Agarwal <[email protected]>

* lint

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Ut fix

Signed-off-by: Deepanshu Agarwal <[email protected]>

* validate demo_workflow

Signed-off-by: Deepanshu Agarwal <[email protected]>

* App Readme

Signed-off-by: Deepanshu Agarwal <[email protected]>

* fix step md

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Adding in exceptions to workflow methods

Signed-off-by: Ryan Lettieri <[email protected]>

* Encoding workflow start data and fixing test

Signed-off-by: Ryan Lettieri <[email protected]>

* Cleaning up some workflow methods

Signed-off-by: Ryan Lettieri <[email protected]>

* Validate demo workflow example

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Remove demo actor temporarily

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Include raise event test and assertions

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Rename

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Adressing some workflow comments

Signed-off-by: Ryan Lettieri <[email protected]>

* Incorporate Review comments

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Lint, validate

Signed-off-by: Deepanshu Agarwal <[email protected]>

* test correction

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Fake class method correction

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Check expected std output in validate example

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Remove extra port check

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Temporary - Verify Workflow Example first

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Requirements

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Remove line

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Add back removed validate examples

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Update examples/demo_workflow/demo_workflow/requirements.txt

Signed-off-by: Bernd Verst <[email protected]>

* Change running order of wf

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Commit to re-run example

Signed-off-by: Deepanshu Agarwal <[email protected]>

* Addressing some review comments

Signed-off-by: Ryan Lettieri <[email protected]>

* Few updates for workflow PR

Signed-off-by: Ryan Lettieri <[email protected]>

* More workflow PR fixes

Signed-off-by: Ryan Lettieri <[email protected]>

* More workflow typing fixes

Signed-off-by: Ryan Lettieri <[email protected]>

* Test and aio client fixes for workflow

Signed-off-by: Ryan Lettieri <[email protected]>

* Whitespace fix

Signed-off-by: Ryan Lettieri <[email protected]>

* Making workflow test more deterministic

Signed-off-by: Ryan Lettieri <[email protected]>

* Fixing broken workflow test

Signed-off-by: Ryan Lettieri <[email protected]>

* Addressing some workflow comments

Signed-off-by: Ryan Lettieri <[email protected]>

* linting

Signed-off-by: Ryan Lettieri <[email protected]>

* Addressing more comments

Signed-off-by: Ryan Lettieri <[email protected]>

* Even more fixes to workflow

Signed-off-by: Ryan Lettieri <[email protected]>

* Fixing workflow example

Signed-off-by: Ryan Lettieri <[email protected]>

* Making workflow example more consitent

Signed-off-by: Ryan Lettieri <[email protected]>

* Making workflow example more consitent again

Signed-off-by: Ryan Lettieri <[email protected]>

* Fixing serialization on workflow

Signed-off-by: Ryan Lettieri <[email protected]>

* Code formatting for workflow

Signed-off-by: Ryan Lettieri <[email protected]>

* Updating workflow documentation

Signed-off-by: Ryan Lettieri <[email protected]>

* Fixing input to start workflow

Signed-off-by: Ryan Lettieri <[email protected]>

---------

Signed-off-by: Ryan Lettieri <[email protected]>
Signed-off-by: Deepanshu Agarwal <[email protected]>
Signed-off-by: Bernd Verst <[email protected]>
Co-authored-by: Deepanshu Agarwal <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
Co-authored-by: Bernd Verst <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Workflow implementation in SDK
6 participants