diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index e2ae9f2177c..94601e988aa 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -40,31 +40,36 @@ def validate(self, data: QueryRuleData, meta: RuleMeta) -> None: packages_manifest = load_integrations_manifests() package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) + # validate the query against fields within beats + self.validate_stack_combos(data, meta) + if package_integrations: # validate the query against related integration fields self.validate_integration(data, meta, package_integrations) - else: - for stack_version, mapping in meta.get_validation_stack_versions().items(): - beats_version = mapping['beats'] - ecs_version = mapping['ecs'] - err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}' - - beat_types, beat_schema, schema = self.get_beats_schema(data.index or [], - beats_version, ecs_version) - - try: - kql.parse(self.query, schema=schema) - except kql.KqlParseError as exc: - message = exc.error_msg - trailer = err_trailer - if "Unknown field" in message and beat_types: - trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}" - - raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, - len(exc.caret.lstrip()), trailer=trailer) from None - except Exception: - print(err_trailer) - raise + + def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None: + """Validate the query against ECS and beats schemas across stack combinations.""" + for stack_version, mapping in meta.get_validation_stack_versions().items(): + beats_version = mapping['beats'] + ecs_version = mapping['ecs'] + err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}' + + beat_types, beat_schema, schema = self.get_beats_schema(data.index or [], + beats_version, ecs_version) + + try: + kql.parse(self.query, schema=schema) + except kql.KqlParseError as exc: + message = exc.error_msg + trailer = err_trailer + if "Unknown field" in message and beat_types: + trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}" + + raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, + len(exc.caret.lstrip()), trailer=trailer) from None + except Exception: + print(err_trailer) + raise def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None: """Validate the query, called from the parent which contains [metadata] information.""" @@ -105,7 +110,8 @@ def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_inte f"{stack_version=}, {ecs_version=}" ) error_fields[field] = {"error": exc, "trailer": trailer} - print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}") + if data.get("notify", False): + print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}") else: raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, len(exc.caret.lstrip()), trailer=trailer) from None @@ -154,30 +160,34 @@ def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: packages_manifest = load_integrations_manifests() package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) + # validate the query against fields within beats + self.validate_stack_combos(data, meta) + if package_integrations: # validate the query against related integration fields self.validate_integration(data, meta, package_integrations) - else: - for stack_version, mapping in meta.get_validation_stack_versions().items(): - beats_version = mapping['beats'] - ecs_version = mapping['ecs'] - endgame_version = mapping['endgame'] - err_trailer = f'stack: {stack_version}, beats: {beats_version},' \ - f'ecs: {ecs_version}, endgame: {endgame_version}' - - beat_types, beat_schema, schema = self.get_beats_schema(data.index or [], - beats_version, ecs_version) - endgame_schema = self.get_endgame_schema(data.index, endgame_version) - eql_schema = ecs.KqlSchema2Eql(schema) + def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None: + """Validate the query against ECS and beats schemas across stack combinations.""" + for stack_version, mapping in meta.get_validation_stack_versions().items(): + beats_version = mapping['beats'] + ecs_version = mapping['ecs'] + endgame_version = mapping['endgame'] + err_trailer = f'stack: {stack_version}, beats: {beats_version},' \ + f'ecs: {ecs_version}, endgame: {endgame_version}' + + beat_types, beat_schema, schema = self.get_beats_schema(data.index or [], + beats_version, ecs_version) + endgame_schema = self.get_endgame_schema(data.index, endgame_version) + eql_schema = ecs.KqlSchema2Eql(schema) - # validate query against the beats and eql schema - self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer, - beat_types=beat_types) + # validate query against the beats and eql schema + self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer, + beat_types=beat_types) - if endgame_schema: - # validate query against the endgame schema - self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer) + if endgame_schema: + # validate query against the endgame schema + self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer) def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None: """Validate an EQL query while checking TOMLRule against integration schemas.""" @@ -195,7 +205,6 @@ def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_inte package_version = integration_schema_data['package_version'] integration_schema = integration_schema_data['schema'] stack_version = integration_schema_data['stack_version'] - endgame_version = integration_schema_data['endgame_version'] if stack_version != current_stack_version: # reset the combined schema for each stack version @@ -223,17 +232,11 @@ def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_inte f"{stack_version=}, {ecs_version=}" ) error_fields[field] = {"error": exc, "trailer": trailer} - print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}") + if data.get("notify", False): + print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}") else: raise exc - # Still need to check endgame if it's in the index - endgame_schema = self.get_endgame_schema(data.index, endgame_version) - if endgame_schema: - # validate query against the endgame schema - err_trailer = f'stack: {stack_version}, endgame: {endgame_version}' - self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer) - # don't error on fields that are in another integration schema for field in list(error_fields.keys()): if field in combined_schema: diff --git a/tests/base.py b/tests/base.py index 823714aa7fd..f090f66b162 100644 --- a/tests/base.py +++ b/tests/base.py @@ -5,6 +5,7 @@ """Shared resources for tests.""" +import os import unittest from typing import Union @@ -17,6 +18,7 @@ class BaseRuleTest(unittest.TestCase): @classmethod def setUpClass(cls): + os.environ["DR_NOTIFY_INTEGRATION_UPDATE_AVAILABLE"] = "1" rc = RuleCollection.default() cls.all_rules = rc.rules cls.rule_lookup = rc.id_map