Skip to content

Validate against beats and integrations schemas #2524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 8, 2023
Merged
103 changes: 53 additions & 50 deletions detection_rules/rule_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,36 @@ def validate(self, data: QueryRuleData, meta: RuleMeta) -> None:
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)

# validate the query against fields within beats
self.validate_stack_combos(data, meta)

if package_integrations:
# validate the query against related integration fields
self.validate_integration(data, meta, package_integrations)
else:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}'

beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)

try:
kql.parse(self.query, schema=schema)
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"

raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise

def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}'

beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)

try:
kql.parse(self.query, schema=schema)
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"

raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise

def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None:
"""Validate the query, called from the parent which contains [metadata] information."""
Expand Down Expand Up @@ -105,7 +110,8 @@ def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_inte
f"{stack_version=}, {ecs_version=}"
)
error_fields[field] = {"error": exc, "trailer": trailer}
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
if data.get("notify", False):
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
Expand Down Expand Up @@ -154,30 +160,34 @@ def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None:
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)

# validate the query against fields within beats
self.validate_stack_combos(data, meta)

if package_integrations:
# validate the query against related integration fields
self.validate_integration(data, meta, package_integrations)

else:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']
err_trailer = f'stack: {stack_version}, beats: {beats_version},' \
f'ecs: {ecs_version}, endgame: {endgame_version}'

beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
eql_schema = ecs.KqlSchema2Eql(schema)
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']
err_trailer = f'stack: {stack_version}, beats: {beats_version},' \
f'ecs: {ecs_version}, endgame: {endgame_version}'

beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
eql_schema = ecs.KqlSchema2Eql(schema)

# validate query against the beats and eql schema
self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer,
beat_types=beat_types)
# validate query against the beats and eql schema
self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer,
beat_types=beat_types)

if endgame_schema:
# validate query against the endgame schema
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)
if endgame_schema:
# validate query against the endgame schema
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)

def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None:
"""Validate an EQL query while checking TOMLRule against integration schemas."""
Expand All @@ -195,7 +205,6 @@ def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_inte
package_version = integration_schema_data['package_version']
integration_schema = integration_schema_data['schema']
stack_version = integration_schema_data['stack_version']
endgame_version = integration_schema_data['endgame_version']

if stack_version != current_stack_version:
# reset the combined schema for each stack version
Expand Down Expand Up @@ -223,17 +232,11 @@ def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_inte
f"{stack_version=}, {ecs_version=}"
)
error_fields[field] = {"error": exc, "trailer": trailer}
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
if data.get("notify", False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does notify get into data?

Copy link
Contributor Author

@Mikaayenson Mikaayenson Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does notify get into data?

It's a property (set by environment var) similar to how we skip note validation.

print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
raise exc

# Still need to check endgame if it's in the index
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
if endgame_schema:
# validate query against the endgame schema
err_trailer = f'stack: {stack_version}, endgame: {endgame_version}'
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)

# don't error on fields that are in another integration schema
for field in list(error_fields.keys()):
if field in combined_schema:
Expand Down
2 changes: 2 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

"""Shared resources for tests."""

import os
import unittest
from typing import Union

Expand All @@ -17,6 +18,7 @@ class BaseRuleTest(unittest.TestCase):

@classmethod
def setUpClass(cls):
os.environ["DR_NOTIFY_INTEGRATION_UPDATE_AVAILABLE"] = "1"
rc = RuleCollection.default()
cls.all_rules = rc.rules
cls.rule_lookup = rc.id_map
Expand Down