From 5f437121f7428dfe3fa54a14dd77b7c575aaa997 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Fri, 20 Dec 2024 11:00:18 -0800 Subject: [PATCH 1/3] pass WorkflowJob into process_workflows --- src/sentry/tasks/post_process.py | 5 +- .../handlers/action/notification.py | 5 +- .../handlers/condition/__init__.py | 3 ++ .../condition/group_event_handlers.py | 13 ++--- .../condition/group_state_handlers.py | 27 +++++++++++ src/sentry/workflow_engine/models/action.py | 7 ++- .../workflow_engine/models/data_condition.py | 2 + src/sentry/workflow_engine/models/workflow.py | 6 +-- .../workflow_engine/processors/action.py | 6 +-- .../workflow_engine/processors/detector.py | 6 +-- .../workflow_engine/processors/workflow.py | 18 +++---- src/sentry/workflow_engine/types.py | 17 ++++++- .../workflow_engine/models/test_workflow.py | 8 ++-- .../workflow_engine/processors/test_action.py | 8 ++-- .../processors/test_workflow.py | 47 +++++++++++++++---- 15 files changed, 127 insertions(+), 51 deletions(-) create mode 100644 src/sentry/workflow_engine/handlers/condition/group_state_handlers.py diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index 0eb44ffeea7aa5..d69852bdf850b0 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -40,6 +40,7 @@ from sentry.utils.sdk import bind_organization_context, set_current_event_project from sentry.utils.sdk_crashes.sdk_crash_detection_config import build_sdk_crash_detection_configs from sentry.utils.services import build_instance_from_options_of_type +from sentry.workflow_engine.types import WorkflowJob if TYPE_CHECKING: from sentry.eventstore.models import Event, GroupEvent @@ -1002,10 +1003,10 @@ def process_workflow_engine(job: PostProcessJob) -> None: # If the flag is enabled, use the code below from sentry.workflow_engine.processors.workflow import process_workflows - evt = job["event"] + workflow_job: WorkflowJob = dict(job) with sentry_sdk.start_span(op="tasks.post_process_group.workflow_engine.process_workflow"): - process_workflows(evt) + process_workflows(workflow_job) def process_rules(job: PostProcessJob) -> None: diff --git a/src/sentry/workflow_engine/handlers/action/notification.py b/src/sentry/workflow_engine/handlers/action/notification.py index c9637f32466442..91b8bfcc96e718 100644 --- a/src/sentry/workflow_engine/handlers/action/notification.py +++ b/src/sentry/workflow_engine/handlers/action/notification.py @@ -1,7 +1,6 @@ -from sentry.eventstore.models import GroupEvent from sentry.workflow_engine.models import Action, Detector from sentry.workflow_engine.registry import action_handler_registry -from sentry.workflow_engine.types import ActionHandler +from sentry.workflow_engine.types import ActionHandler, WorkflowJob # TODO - Enable once the PR to allow for multiple of the same funcs is merged @@ -10,7 +9,7 @@ class NotificationActionHandler(ActionHandler): @staticmethod def execute( - evt: GroupEvent, + job: WorkflowJob, action: Action, detector: Detector, ) -> None: diff --git a/src/sentry/workflow_engine/handlers/condition/__init__.py b/src/sentry/workflow_engine/handlers/condition/__init__.py index 578500d97efdb1..7a98d09c4ad7be 100644 --- a/src/sentry/workflow_engine/handlers/condition/__init__.py +++ b/src/sentry/workflow_engine/handlers/condition/__init__.py @@ -1,9 +1,12 @@ __all__ = [ "EventCreatedByDetectorConditionHandler", "EventSeenCountConditionHandler", + "ReappearedEventConditionHandler", + "RegressedEventConditionHandler", ] from .group_event_handlers import ( EventCreatedByDetectorConditionHandler, EventSeenCountConditionHandler, ) +from .group_state_handlers import ReappearedEventConditionHandler, RegressedEventConditionHandler diff --git a/src/sentry/workflow_engine/handlers/condition/group_event_handlers.py b/src/sentry/workflow_engine/handlers/condition/group_event_handlers.py index ee299180400a65..2bc9422c544595 100644 --- a/src/sentry/workflow_engine/handlers/condition/group_event_handlers.py +++ b/src/sentry/workflow_engine/handlers/condition/group_event_handlers.py @@ -1,15 +1,15 @@ from typing import Any -from sentry.eventstore.models import GroupEvent from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.registry import condition_handler_registry -from sentry.workflow_engine.types import DataConditionHandler +from sentry.workflow_engine.types import DataConditionHandler, WorkflowJob @condition_handler_registry.register(Condition.EVENT_CREATED_BY_DETECTOR) -class EventCreatedByDetectorConditionHandler(DataConditionHandler[GroupEvent]): +class EventCreatedByDetectorConditionHandler(DataConditionHandler[WorkflowJob]): @staticmethod - def evaluate_value(event: GroupEvent, comparison: Any) -> bool: + def evaluate_value(job: WorkflowJob, comparison: Any) -> bool: + event = job["event"] if event.occurrence is None or event.occurrence.evidence_data is None: return False @@ -17,7 +17,8 @@ def evaluate_value(event: GroupEvent, comparison: Any) -> bool: @condition_handler_registry.register(Condition.EVENT_SEEN_COUNT) -class EventSeenCountConditionHandler(DataConditionHandler[GroupEvent]): +class EventSeenCountConditionHandler(DataConditionHandler[WorkflowJob]): @staticmethod - def evaluate_value(event: GroupEvent, comparison: Any) -> bool: + def evaluate_value(job: WorkflowJob, comparison: Any) -> bool: + event = job["event"] return event.group.times_seen == comparison diff --git a/src/sentry/workflow_engine/handlers/condition/group_state_handlers.py b/src/sentry/workflow_engine/handlers/condition/group_state_handlers.py new file mode 100644 index 00000000000000..3c34b052844928 --- /dev/null +++ b/src/sentry/workflow_engine/handlers/condition/group_state_handlers.py @@ -0,0 +1,27 @@ +from typing import Any + +from sentry.workflow_engine.models.data_condition import Condition +from sentry.workflow_engine.registry import condition_handler_registry +from sentry.workflow_engine.types import DataConditionHandler, WorkflowJob + + +@condition_handler_registry.register(Condition.REGRESSED_EVENT) +class RegressedEventConditionHandler(DataConditionHandler[WorkflowJob]): + @staticmethod + def evaluate_value(job: WorkflowJob, comparison: Any) -> bool: + state = job.get("group_state", None) + if state is None: + return False + + return state["is_regression"] == comparison + + +@condition_handler_registry.register(Condition.REAPPEARED_EVENT) +class ReappearedEventConditionHandler(DataConditionHandler[WorkflowJob]): + @staticmethod + def evaluate_value(job: WorkflowJob, comparison: Any) -> bool: + has_reappeared = job.get("has_reappeared", None) + if has_reappeared is None: + return False + + return has_reappeared == comparison diff --git a/src/sentry/workflow_engine/models/action.py b/src/sentry/workflow_engine/models/action.py index d6b90a7fa8846d..efc7a1e3dabd9f 100644 --- a/src/sentry/workflow_engine/models/action.py +++ b/src/sentry/workflow_engine/models/action.py @@ -7,10 +7,9 @@ from sentry.backup.scopes import RelocationScope from sentry.db.models import DefaultFieldsModel, region_silo_model, sane_repr from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey -from sentry.eventstore.models import GroupEvent from sentry.notifications.models.notificationaction import ActionTarget from sentry.workflow_engine.registry import action_handler_registry -from sentry.workflow_engine.types import ActionHandler +from sentry.workflow_engine.types import ActionHandler, WorkflowJob if TYPE_CHECKING: from sentry.workflow_engine.models import Detector @@ -72,7 +71,7 @@ def get_handler(self) -> ActionHandler: action_type = Action.Type(self.type) return action_handler_registry.get(action_type) - def trigger(self, evt: GroupEvent, detector: Detector) -> None: + def trigger(self, job: WorkflowJob, detector: Detector) -> None: # get the handler for the action type handler = self.get_handler() - handler.execute(evt, self, detector) + handler.execute(job, self, detector) diff --git a/src/sentry/workflow_engine/models/data_condition.py b/src/sentry/workflow_engine/models/data_condition.py index 27a36d70ddcb40..8180723466faac 100644 --- a/src/sentry/workflow_engine/models/data_condition.py +++ b/src/sentry/workflow_engine/models/data_condition.py @@ -22,6 +22,8 @@ class Condition(models.TextChoices): NOT_EQUAL = "ne" EVENT_CREATED_BY_DETECTOR = "event_created_by_detector" EVENT_SEEN_COUNT = "event_seen_count" + REGRESSED_EVENT = "regressed_event" + REAPPEARED_EVENT = "reappeared_event" condition_ops = { diff --git a/src/sentry/workflow_engine/models/workflow.py b/src/sentry/workflow_engine/models/workflow.py index 829fe66f596273..4c25fabc175b26 100644 --- a/src/sentry/workflow_engine/models/workflow.py +++ b/src/sentry/workflow_engine/models/workflow.py @@ -8,9 +8,9 @@ from sentry.backup.scopes import RelocationScope from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey -from sentry.eventstore.models import GroupEvent from sentry.models.owner_base import OwnerModel from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group +from sentry.workflow_engine.types import WorkflowJob from .json_config import JSONConfigBase @@ -53,7 +53,7 @@ class Meta: ) ] - def evaluate_trigger_conditions(self, evt: GroupEvent) -> bool: + def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool: """ Evaluate the conditions for the workflow trigger and return if the evaluation was successful. If there aren't any workflow trigger conditions, the workflow is considered triggered. @@ -61,7 +61,7 @@ def evaluate_trigger_conditions(self, evt: GroupEvent) -> bool: if self.when_condition_group is None: return True - evaluation, _ = evaluate_condition_group(self.when_condition_group, evt) + evaluation, _ = evaluate_condition_group(self.when_condition_group, job) return evaluation diff --git a/src/sentry/workflow_engine/processors/action.py b/src/sentry/workflow_engine/processors/action.py index 0e57ee44441aea..bf03015bf37444 100644 --- a/src/sentry/workflow_engine/processors/action.py +++ b/src/sentry/workflow_engine/processors/action.py @@ -1,11 +1,11 @@ from sentry.db.models.manager.base_query_set import BaseQuerySet -from sentry.eventstore.models import GroupEvent from sentry.workflow_engine.models import Action, DataConditionGroup, Workflow from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group +from sentry.workflow_engine.types import WorkflowJob def evaluate_workflow_action_filters( - workflows: set[Workflow], evt: GroupEvent + workflows: set[Workflow], job: WorkflowJob ) -> BaseQuerySet[Action]: filtered_action_groups: set[DataConditionGroup] = set() @@ -17,7 +17,7 @@ def evaluate_workflow_action_filters( ).distinct() for action_condition in action_conditions: - evaluation, result = evaluate_condition_group(action_condition, evt) + evaluation, result = evaluate_condition_group(action_condition, job) if evaluation: filtered_action_groups.add(action_condition) diff --git a/src/sentry/workflow_engine/processors/detector.py b/src/sentry/workflow_engine/processors/detector.py index ba7c6e9718cacd..b0a038fdbcb5c6 100644 --- a/src/sentry/workflow_engine/processors/detector.py +++ b/src/sentry/workflow_engine/processors/detector.py @@ -2,19 +2,19 @@ import logging -from sentry.eventstore.models import GroupEvent from sentry.issues.grouptype import ErrorGroupType from sentry.issues.issue_occurrence import IssueOccurrence from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult from sentry.workflow_engine.models import DataPacket, Detector -from sentry.workflow_engine.types import DetectorGroupKey +from sentry.workflow_engine.types import DetectorGroupKey, WorkflowJob logger = logging.getLogger(__name__) # TODO - cache these by evt.group_id? :thinking: -def get_detector_by_event(evt: GroupEvent) -> Detector: +def get_detector_by_event(job: WorkflowJob) -> Detector: + evt = job["event"] issue_occurrence = evt.occurrence if issue_occurrence is None: diff --git a/src/sentry/workflow_engine/processors/workflow.py b/src/sentry/workflow_engine/processors/workflow.py index effc18173780a6..be2fe0f88e4e92 100644 --- a/src/sentry/workflow_engine/processors/workflow.py +++ b/src/sentry/workflow_engine/processors/workflow.py @@ -2,26 +2,26 @@ import sentry_sdk -from sentry.eventstore.models import GroupEvent from sentry.utils import metrics from sentry.workflow_engine.models import Detector, Workflow from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters from sentry.workflow_engine.processors.detector import get_detector_by_event +from sentry.workflow_engine.types import WorkflowJob logger = logging.getLogger(__name__) -def evaluate_workflow_triggers(workflows: set[Workflow], evt: GroupEvent) -> set[Workflow]: +def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> set[Workflow]: triggered_workflows: set[Workflow] = set() for workflow in workflows: - if workflow.evaluate_trigger_conditions(evt): + if workflow.evaluate_trigger_conditions(job): triggered_workflows.add(workflow) return triggered_workflows -def process_workflows(evt: GroupEvent) -> set[Workflow]: +def process_workflows(job: WorkflowJob) -> set[Workflow]: """ This method will get the detector based on the event, and then gather the associated workflows. Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met, @@ -31,19 +31,19 @@ def process_workflows(evt: GroupEvent) -> set[Workflow]: """ # Check to see if the GroupEvent has an issue occurrence try: - detector = get_detector_by_event(evt) + detector = get_detector_by_event(job) except Detector.DoesNotExist: metrics.incr("workflow_engine.process_workflows.error") - logger.exception("Detector not found for event", extra={"event_id": evt.event_id}) + logger.exception("Detector not found for event", extra={"event_id": job["event"].event_id}) return set() # Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct()) - triggered_workflows = evaluate_workflow_triggers(workflows, evt) - actions = evaluate_workflow_action_filters(triggered_workflows, evt) + triggered_workflows = evaluate_workflow_triggers(workflows, job) + actions = evaluate_workflow_action_filters(triggered_workflows, job) with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"): for action in actions: - action.trigger(evt, detector) + action.trigger(job, detector) return triggered_workflows diff --git a/src/sentry/workflow_engine/types.py b/src/sentry/workflow_engine/types.py index ce87257a13ad5c..b8add37097e204 100644 --- a/src/sentry/workflow_engine/types.py +++ b/src/sentry/workflow_engine/types.py @@ -1,12 +1,13 @@ from __future__ import annotations from enum import IntEnum -from typing import TYPE_CHECKING, Any, Generic, TypeVar +from typing import TYPE_CHECKING, Any, Generic, TypedDict, TypeVar from sentry.types.group import PriorityLevel if TYPE_CHECKING: from sentry.eventstore.models import GroupEvent + from sentry.eventstream.base import GroupState from sentry.workflow_engine.models import Action, Detector T = TypeVar("T") @@ -28,9 +29,21 @@ class DetectorPriorityLevel(IntEnum): ProcessedDataConditionResult = tuple[bool, list[DataConditionResult]] +class EventJob(TypedDict): + event: GroupEvent + + +class WorkflowJob(EventJob, total=False): + group_state: GroupState + is_reprocessed: bool + has_reappeared: bool + has_alert: bool + has_escalated: bool + + class ActionHandler: @staticmethod - def execute(group_event: GroupEvent, action: Action, detector: Detector) -> None: + def execute(job: WorkflowJob, action: Action, detector: Detector) -> None: raise NotImplementedError diff --git a/tests/sentry/workflow_engine/models/test_workflow.py b/tests/sentry/workflow_engine/models/test_workflow.py index c00edd89a02432..8a9d1e841a4b9c 100644 --- a/tests/sentry/workflow_engine/models/test_workflow.py +++ b/tests/sentry/workflow_engine/models/test_workflow.py @@ -1,3 +1,4 @@ +from sentry.workflow_engine.types import WorkflowJob from tests.sentry.workflow_engine.test_base import BaseWorkflowTest @@ -8,21 +9,22 @@ def setUp(self): ) self.data_condition = self.data_condition_group.conditions.first() self.group, self.event, self.group_event = self.create_group_event() + self.job = WorkflowJob({"event": self.group_event}) def test_evaluate_trigger_conditions__condition_new_event__True(self): - evaluation = self.workflow.evaluate_trigger_conditions(self.group_event) + evaluation = self.workflow.evaluate_trigger_conditions(self.job) assert evaluation is True def test_evaluate_trigger_conditions__condition_new_event__False(self): # Update event to have been seen before self.group_event.group.times_seen = 5 - evaluation = self.workflow.evaluate_trigger_conditions(self.group_event) + evaluation = self.workflow.evaluate_trigger_conditions(self.job) assert evaluation is False def test_evaluate_trigger_conditions__no_conditions(self): self.workflow.when_condition_group = None self.workflow.save() - evaluation = self.workflow.evaluate_trigger_conditions(self.group_event) + evaluation = self.workflow.evaluate_trigger_conditions(self.job) assert evaluation is True diff --git a/tests/sentry/workflow_engine/processors/test_action.py b/tests/sentry/workflow_engine/processors/test_action.py index 7bc44d77abc61c..72534ccad5fedb 100644 --- a/tests/sentry/workflow_engine/processors/test_action.py +++ b/tests/sentry/workflow_engine/processors/test_action.py @@ -1,5 +1,6 @@ from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters +from sentry.workflow_engine.types import WorkflowJob from tests.sentry.workflow_engine.test_base import BaseWorkflowTest @@ -17,9 +18,10 @@ def setUp(self): self.group, self.event, self.group_event = self.create_group_event( occurrence=self.build_occurrence(evidence_data={"detector_id": self.detector.id}) ) + self.job = WorkflowJob({"event": self.group_event}) def test_basic__no_filter(self): - triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event) + triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job) assert set(triggered_actions) == {self.action} def test_basic__with_filter__passes(self): @@ -30,7 +32,7 @@ def test_basic__with_filter__passes(self): condition_result=True, ) - triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event) + triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job) assert set(triggered_actions) == {self.action} def test_basic__with_filter__filtered(self): @@ -41,5 +43,5 @@ def test_basic__with_filter__filtered(self): comparison=self.detector.id + 1, ) - triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event) + triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job) assert not triggered_actions diff --git a/tests/sentry/workflow_engine/processors/test_workflow.py b/tests/sentry/workflow_engine/processors/test_workflow.py index 7f75494c571252..9c1e39848324d3 100644 --- a/tests/sentry/workflow_engine/processors/test_workflow.py +++ b/tests/sentry/workflow_engine/processors/test_workflow.py @@ -1,9 +1,11 @@ from unittest import mock +from sentry.eventstream.base import GroupState from sentry.issues.grouptype import ErrorGroupType from sentry.workflow_engine.models import DataConditionGroup from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.processors.workflow import evaluate_workflow_triggers, process_workflows +from sentry.workflow_engine.types import WorkflowJob from tests.sentry.workflow_engine.test_base import BaseWorkflowTest @@ -25,24 +27,50 @@ def setUp(self): ) self.group, self.event, self.group_event = self.create_group_event() + self.job = WorkflowJob( + { + "event": self.group_event, + "group_state": GroupState( + id=1, is_new=False, is_regression=True, is_new_group_environment=False + ), + } + ) def test_error_event(self): - triggered_workflows = process_workflows(self.group_event) + triggered_workflows = process_workflows(self.job) assert triggered_workflows == {self.error_workflow} def test_issue_occurrence_event(self): issue_occurrence = self.build_occurrence(evidence_data={"detector_id": self.detector.id}) self.group_event.occurrence = issue_occurrence - triggered_workflows = process_workflows(self.group_event) + triggered_workflows = process_workflows(self.job) assert triggered_workflows == {self.workflow} + def test_regressed_event(self): + dcg = self.create_data_condition_group() + self.create_data_condition( + type=Condition.REGRESSED_EVENT, + comparison=True, + condition_result=True, + condition_group=dcg, + ) + + workflow = self.create_workflow(when_condition_group=dcg) + self.create_detector_workflow( + detector=self.error_detector, + workflow=workflow, + ) + + triggered_workflows = process_workflows(self.job) + assert triggered_workflows == {self.error_workflow, workflow} + def test_no_detector(self): self.group_event.occurrence = self.build_occurrence(evidence_data={}) with mock.patch("sentry.workflow_engine.processors.workflow.logger") as mock_logger: with mock.patch("sentry.workflow_engine.processors.workflow.metrics") as mock_metrics: - triggered_workflows = process_workflows(self.group_event) + triggered_workflows = process_workflows(self.job) assert not triggered_workflows @@ -66,13 +94,14 @@ def setUp(self): self.group, self.event, self.group_event = self.create_group_event( occurrence=occurrence, ) + self.job = WorkflowJob({"event": self.group_event}) def test_workflow_trigger(self): - triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.group_event) + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) assert triggered_workflows == {self.workflow} def test_no_workflow_trigger(self): - triggered_workflows = evaluate_workflow_triggers(set(), self.group_event) + triggered_workflows = evaluate_workflow_triggers(set(), self.job) assert not triggered_workflows def test_workflow_many_filters(self): @@ -86,7 +115,7 @@ def test_workflow_many_filters(self): condition_result=75, ) - triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.group_event) + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) assert triggered_workflows == {self.workflow} def test_workflow_filterd_out(self): @@ -99,13 +128,11 @@ def test_workflow_filterd_out(self): comparison=self.detector.id + 1, ) - triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.group_event) + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) assert not triggered_workflows def test_many_workflows(self): workflow_two, _, _, _ = self.create_detector_and_workflow(name_prefix="two") - triggered_workflows = evaluate_workflow_triggers( - {self.workflow, workflow_two}, self.group_event - ) + triggered_workflows = evaluate_workflow_triggers({self.workflow, workflow_two}, self.job) assert triggered_workflows == {self.workflow, workflow_two} From 846ad3831cb25d6f6e9be2ff938203f2bb71e1da Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Fri, 20 Dec 2024 11:23:15 -0800 Subject: [PATCH 2/3] convert PostProcessJob to WorkflowJob --- src/sentry/tasks/post_process.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index d69852bdf850b0..c7cde181111ef7 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -1003,7 +1003,12 @@ def process_workflow_engine(job: PostProcessJob) -> None: # If the flag is enabled, use the code below from sentry.workflow_engine.processors.workflow import process_workflows - workflow_job: WorkflowJob = dict(job) + # PostProcessJob event is optional, WorkflowJob event is required + try: + workflow_job = WorkflowJob({**job}) # type: ignore[typeddict-item] + except Exception: + logger.exception("Could not create WorkflowJob", extra={"job": job}) + return with sentry_sdk.start_span(op="tasks.post_process_group.workflow_engine.process_workflow"): process_workflows(workflow_job) From a62dc3bceb196b2caa327e2941bad15980d717cc Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Fri, 20 Dec 2024 11:31:37 -0800 Subject: [PATCH 3/3] add check for event in job --- src/sentry/tasks/post_process.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index c7cde181111ef7..b5cb83206e0336 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -1004,6 +1004,10 @@ def process_workflow_engine(job: PostProcessJob) -> None: from sentry.workflow_engine.processors.workflow import process_workflows # PostProcessJob event is optional, WorkflowJob event is required + if "event" not in job: + logger.error("Missing event to create WorkflowJob", extra={"job": job}) + return + try: workflow_job = WorkflowJob({**job}) # type: ignore[typeddict-item] except Exception: