Skip to content

Commit 2fd909e

Browse files
committed
done
1 parent 91014e1 commit 2fd909e

File tree

5 files changed

+259
-2
lines changed

5 files changed

+259
-2
lines changed

README.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ samples for iWF Python SDK
77
* Python 3.9+
88
* [iWF server](https://github.com/indeedeng/iwf#how-to-use)
99

10-
## User sign-up workflow
10+
## Case1: [Money transfer workflow/SAGA Patten](./moneytransfer)
11+
12+
This example shows how to transfer money from one account to another account.
13+
The transfer involves multiple steps. When any step fails, the whole transfer is canceled with some compensation steps.
14+
15+
## Case 2: User sign-up workflow
1116

1217
A common use case that is almost everywhere -- new user sign-up/register a new account in a website/system.
1318
E.g. Amazon/Linkedin/Google/etc...
@@ -47,7 +52,7 @@ WorkflowAsCode
4752
Natural to represent business
4853
Builtin & rich support for operation tooling
4954

50-
It's so simple & easy to do that the [business logic code](./signup/signup_workflow.py) can be shown here!
55+
It's so simple & easy to do that the [business logic code like this](./signup/signup_workflow.py) can be shown here!
5156

5257
```python
5358
class SubmitState(WorkflowState[Form]):

moneytransfer/README.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
### How to run
2+
3+
* start a iWF server following the [instructions](https://github.com/indeedeng/iwf#how-to-use)
4+
* build and run `main.py`
5+
* start a workflow: `http://localhost:8802/moneytransfer/start?fromAccount=test1&toAccount=test2&amount=100&notes=hello`
6+
* watch in WebUI `http://localhost:8233/namespaces/default/workflows`
7+
* modify the workflow code to try injecting some errors, and shorten the retry, to see what will happen

moneytransfer/iwf_config.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from iwf.client import Client
2+
from iwf.registry import Registry
3+
from iwf.worker_service import (
4+
WorkerService,
5+
)
6+
7+
from moneytransfer.money_transfer_workflow import MoneyTransferWorkflow
8+
9+
registry = Registry()
10+
worker_service = WorkerService(registry)
11+
client = Client(registry, )
12+
13+
registry.add_workflow(MoneyTransferWorkflow())

moneytransfer/main.py

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import time
2+
import traceback
3+
4+
from flask import Flask, request
5+
from iwf.iwf_api.models import (
6+
WorkflowStateExecuteRequest,
7+
WorkflowStateWaitUntilRequest,
8+
WorkflowWorkerRpcRequest,
9+
)
10+
from iwf.worker_service import (
11+
WorkerService,
12+
)
13+
14+
from moneytransfer.iwf_config import client, worker_service
15+
from moneytransfer.money_transfer_workflow import TransferRequest, MoneyTransferWorkflow
16+
17+
flask_app = Flask(__name__)
18+
19+
20+
# http://localhost:8802/moneytransfer/start
21+
@flask_app.route("/moneytransfer/start")
22+
def money_transfer_start():
23+
from_account = request.args["fromAccount"]
24+
to_account = request.args["toAccount"]
25+
amount = request.args["amount"]
26+
notes = request.args["notes"]
27+
transfer_request = TransferRequest(from_account, to_account, int(amount), notes)
28+
29+
client.start_workflow(MoneyTransferWorkflow, "money_transfer" + str(time.time()), 3600, transfer_request)
30+
return "workflow started"
31+
32+
33+
@flask_app.route("/")
34+
def index():
35+
return "iwf workflow home"
36+
37+
38+
# below are iWF workflow worker APIs to be called by iWF server
39+
40+
41+
@flask_app.route(WorkerService.api_path_workflow_state_wait_until, methods=["POST"])
42+
def handle_wait_until():
43+
req = WorkflowStateWaitUntilRequest.from_dict(request.json)
44+
resp = worker_service.handle_workflow_state_wait_until(req)
45+
return resp.to_dict()
46+
47+
48+
@flask_app.route(WorkerService.api_path_workflow_state_execute, methods=["POST"])
49+
def handle_execute():
50+
req = WorkflowStateExecuteRequest.from_dict(request.json)
51+
resp = worker_service.handle_workflow_state_execute(req)
52+
return resp.to_dict()
53+
54+
55+
@flask_app.route(WorkerService.api_path_workflow_worker_rpc, methods=["POST"])
56+
def handle_rpc():
57+
req = WorkflowWorkerRpcRequest.from_dict(request.json)
58+
resp = worker_service.handle_workflow_worker_rpc(req)
59+
return resp.to_dict()
60+
61+
62+
# this handler is extremely useful for debugging iWF
63+
# the WebUI will be able to show you the error with stacktrace
64+
@flask_app.errorhandler(Exception)
65+
def internal_error(exception):
66+
return traceback.format_exc(), 500
67+
68+
69+
def main():
70+
flask_app.run(host="0.0.0.0", port=8802)
71+
72+
73+
if __name__ == "__main__":
74+
main()
+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
from dataclasses import dataclass
2+
3+
from iwf.command_results import CommandResults
4+
from iwf.communication import Communication
5+
from iwf.iwf_api.models import RetryPolicy
6+
from iwf.persistence import Persistence
7+
from iwf.state_decision import StateDecision
8+
from iwf.state_schema import StateSchema
9+
from iwf.workflow import ObjectWorkflow
10+
from iwf.workflow_context import WorkflowContext
11+
from iwf.workflow_state import WorkflowState
12+
from iwf.workflow_state_options import WorkflowStateOptions
13+
14+
15+
@dataclass
16+
class TransferRequest:
17+
from_account: str
18+
to_account: str
19+
amount: int
20+
notes: str
21+
22+
23+
class VerifyState(WorkflowState[TransferRequest]):
24+
def execute(
25+
self,
26+
ctx: WorkflowContext,
27+
request: TransferRequest,
28+
command_results: CommandResults,
29+
persistence: Persistence,
30+
communication: Communication,
31+
) -> StateDecision:
32+
print(f"API to check balance for account {request.from_account} for amount{request.amount}")
33+
34+
has_sufficient_funds = True
35+
if not has_sufficient_funds:
36+
return StateDecision.force_fail_workflow("insufficient funds")
37+
38+
return StateDecision.single_next_state(CreateDebitMemoState, request)
39+
40+
41+
class CreateDebitMemoState(WorkflowState[TransferRequest]):
42+
def execute(
43+
self,
44+
ctx: WorkflowContext,
45+
request: TransferRequest,
46+
command_results: CommandResults,
47+
persistence: Persistence,
48+
communication: Communication,
49+
) -> StateDecision:
50+
print(f"API to create debit memo for account {request.from_account} for amount{request.amount} with notes{request.notes}")
51+
52+
return StateDecision.single_next_state(DebitState, request)
53+
54+
def get_state_options(self) -> WorkflowStateOptions:
55+
return WorkflowStateOptions(
56+
execute_failure_handling_state=CompensateState,
57+
execute_api_retry_policy=RetryPolicy(
58+
maximum_attempts_duration_seconds=3600,
59+
)
60+
)
61+
62+
63+
class DebitState(WorkflowState[TransferRequest]):
64+
def execute(
65+
self,
66+
ctx: WorkflowContext,
67+
request: TransferRequest,
68+
command_results: CommandResults,
69+
persistence: Persistence,
70+
communication: Communication,
71+
) -> StateDecision:
72+
print(f"API to debit account {request.from_account} for amount{request.amount}")
73+
74+
return StateDecision.single_next_state(CreateCreditMemoState, request)
75+
76+
def get_state_options(self) -> WorkflowStateOptions:
77+
return WorkflowStateOptions(
78+
execute_failure_handling_state=CompensateState,
79+
execute_api_retry_policy=RetryPolicy(
80+
maximum_attempts_duration_seconds=3600,
81+
)
82+
)
83+
84+
85+
class CreateCreditMemoState(WorkflowState[TransferRequest]):
86+
def execute(
87+
self,
88+
ctx: WorkflowContext,
89+
request: TransferRequest,
90+
command_results: CommandResults,
91+
persistence: Persistence,
92+
communication: Communication,
93+
) -> StateDecision:
94+
print(f"API to create credit memo for account {request.to_account} for amount{request.amount} with notes{request.notes}")
95+
96+
return StateDecision.single_next_state(CreditState, request)
97+
98+
def get_state_options(self) -> WorkflowStateOptions:
99+
return WorkflowStateOptions(
100+
execute_failure_handling_state=CompensateState,
101+
execute_api_retry_policy=RetryPolicy(
102+
maximum_attempts_duration_seconds=3600,
103+
)
104+
)
105+
106+
107+
class CreditState(WorkflowState[TransferRequest]):
108+
def execute(
109+
self,
110+
ctx: WorkflowContext,
111+
request: TransferRequest,
112+
command_results: CommandResults,
113+
persistence: Persistence,
114+
communication: Communication,
115+
) -> StateDecision:
116+
print(f"API to credit account {request.to_account} for amount{request.amount}")
117+
118+
return StateDecision.graceful_complete_workflow(f"transfer is done from account{request.from_account} "
119+
f"to account{request.to_account} for amount{request.amount}")
120+
121+
def get_state_options(self) -> WorkflowStateOptions:
122+
return WorkflowStateOptions(
123+
execute_failure_handling_state=CompensateState,
124+
execute_api_retry_policy=RetryPolicy(
125+
maximum_attempts_duration_seconds=3600,
126+
)
127+
)
128+
129+
130+
class CompensateState(WorkflowState[TransferRequest]):
131+
def execute(
132+
self,
133+
ctx: WorkflowContext,
134+
request: TransferRequest,
135+
command_results: CommandResults,
136+
persistence: Persistence,
137+
communication: Communication,
138+
) -> StateDecision:
139+
# NOTE: to improve, we can use iWF data attributes to track whether each step has been attempted to execute
140+
# and check a flag to see if we should undo it or not
141+
142+
print(f"API to undo credit account {request.to_account} for amount{request.amount}")
143+
print(f"API to undo create credit memo account {request.to_account} for amount{request.amount}")
144+
print(f"API to undo debit account {request.from_account} for amount{request.amount}")
145+
print(f"API to undo create debit memo {request.from_account} for amount{request.amount}")
146+
147+
return StateDecision.force_fail_workflow("fail to transfer")
148+
149+
150+
class MoneyTransferWorkflow(ObjectWorkflow):
151+
def get_workflow_states(self) -> StateSchema:
152+
return StateSchema.with_starting_state(
153+
VerifyState(),
154+
CreateDebitMemoState(),
155+
DebitState(),
156+
CreateCreditMemoState(),
157+
CreditState(),
158+
CompensateState())

0 commit comments

Comments
 (0)