|
74 | 74 | )
|
75 | 75 | from temporalio.converter import (
|
76 | 76 | DataConverter,
|
| 77 | + DefaultFailureConverter, |
77 | 78 | DefaultFailureConverterWithEncodedAttributes,
|
78 | 79 | DefaultPayloadConverter,
|
79 | 80 | PayloadCodec,
|
|
84 | 85 | ApplicationError,
|
85 | 86 | CancelledError,
|
86 | 87 | ChildWorkflowError,
|
| 88 | + FailureError, |
87 | 89 | TemporalError,
|
88 | 90 | TimeoutError,
|
89 | 91 | WorkflowAlreadyStartedError,
|
@@ -6451,6 +6453,86 @@ async def test_concurrent_sleeps_use_proper_options(
|
6451 | 6453 | await handle.query("__temporal_workflow_metadata")
|
6452 | 6454 |
|
6453 | 6455 |
|
| 6456 | +class BadFailureConverterError(Exception): |
| 6457 | + pass |
| 6458 | + |
| 6459 | + |
| 6460 | +class BadFailureConverter(DefaultFailureConverter): |
| 6461 | + def to_failure( |
| 6462 | + self, |
| 6463 | + exception: BaseException, |
| 6464 | + payload_converter: PayloadConverter, |
| 6465 | + failure: Failure, |
| 6466 | + ) -> None: |
| 6467 | + if isinstance(exception, BadFailureConverterError): |
| 6468 | + raise RuntimeError("Intentional failure conversion error") |
| 6469 | + super().to_failure(exception, payload_converter, failure) |
| 6470 | + |
| 6471 | + |
| 6472 | +@activity.defn |
| 6473 | +async def bad_failure_converter_activity() -> None: |
| 6474 | + raise BadFailureConverterError |
| 6475 | + |
| 6476 | + |
| 6477 | +@workflow.defn(sandboxed=False) |
| 6478 | +class BadFailureConverterWorkflow: |
| 6479 | + @workflow.run |
| 6480 | + async def run(self, fail_workflow_task) -> None: |
| 6481 | + if fail_workflow_task: |
| 6482 | + raise BadFailureConverterError |
| 6483 | + else: |
| 6484 | + await workflow.execute_activity( |
| 6485 | + bad_failure_converter_activity, |
| 6486 | + schedule_to_close_timeout=timedelta(seconds=30), |
| 6487 | + retry_policy=RetryPolicy(maximum_attempts=1), |
| 6488 | + ) |
| 6489 | + |
| 6490 | + |
| 6491 | +async def test_bad_failure_converter(client: Client): |
| 6492 | + config = client.config() |
| 6493 | + config["data_converter"] = dataclasses.replace( |
| 6494 | + config["data_converter"], |
| 6495 | + failure_converter_class=BadFailureConverter, |
| 6496 | + ) |
| 6497 | + client = Client(**config) |
| 6498 | + async with new_worker( |
| 6499 | + client, BadFailureConverterWorkflow, activities=[bad_failure_converter_activity] |
| 6500 | + ) as worker: |
| 6501 | + # Check activity |
| 6502 | + with pytest.raises(WorkflowFailureError) as err: |
| 6503 | + await client.execute_workflow( |
| 6504 | + BadFailureConverterWorkflow.run, |
| 6505 | + False, |
| 6506 | + id=f"workflow-{uuid.uuid4()}", |
| 6507 | + task_queue=worker.task_queue, |
| 6508 | + ) |
| 6509 | + assert isinstance(err.value.cause, ActivityError) |
| 6510 | + assert isinstance(err.value.cause.cause, ApplicationError) |
| 6511 | + assert ( |
| 6512 | + err.value.cause.cause.message |
| 6513 | + == "Failed building exception result: Intentional failure conversion error" |
| 6514 | + ) |
| 6515 | + |
| 6516 | + # Check workflow |
| 6517 | + handle = await client.start_workflow( |
| 6518 | + BadFailureConverterWorkflow.run, |
| 6519 | + True, |
| 6520 | + id=f"workflow-{uuid.uuid4()}", |
| 6521 | + task_queue=worker.task_queue, |
| 6522 | + ) |
| 6523 | + |
| 6524 | + async def task_failed_message() -> Optional[str]: |
| 6525 | + async for e in handle.fetch_history_events(): |
| 6526 | + if e.HasField("workflow_task_failed_event_attributes"): |
| 6527 | + return e.workflow_task_failed_event_attributes.failure.message |
| 6528 | + return None |
| 6529 | + |
| 6530 | + await assert_eq_eventually( |
| 6531 | + "Failed converting activation exception: Intentional failure conversion error", |
| 6532 | + task_failed_message, # type: ignore |
| 6533 | + ) |
| 6534 | + |
| 6535 | + |
6454 | 6536 | @workflow.defn
|
6455 | 6537 | class SignalsActivitiesTimersUpdatesTracingWorkflow:
|
6456 | 6538 | """
|
|
0 commit comments