Skip to content

Commit 4baa8d4

Browse files
Mikaayensongithub-actions[bot]
authored andcommitted
[FR] Add Integration Schema Query Validation (#2470)
(cherry picked from commit 1784429)
1 parent 646228b commit 4baa8d4

File tree

54 files changed

+559
-166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+559
-166
lines changed

detection_rules/beats.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ def _flatten_schema(schema: list, prefix="") -> list:
117117
# it's probably not perfect, but we can fix other bugs as we run into them later
118118
if len(schema) == 1 and nested_prefix.startswith(prefix + prefix):
119119
nested_prefix = s["name"] + "."
120+
if "field" in s:
121+
# integrations sometimes have a group with a single field
122+
flattened.extend(_flatten_schema(s["field"], prefix=nested_prefix))
123+
continue
124+
elif "fields" not in s:
125+
# integrations sometimes have a group with no fields
126+
continue
127+
120128
flattened.extend(_flatten_schema(s["fields"], prefix=nested_prefix))
121129
elif "fields" in s:
122130
flattened.extend(_flatten_schema(s["fields"], prefix=prefix))
@@ -131,6 +139,10 @@ def _flatten_schema(schema: list, prefix="") -> list:
131139
return flattened
132140

133141

142+
def flatten_ecs_schema(schema: dict) -> dict:
143+
return _flatten_schema(schema)
144+
145+
134146
def get_field_schema(base_directory, prefix="", include_common=False):
135147
base_directory = base_directory.get("folders", {}).get("_meta", {}).get("files", {})
136148
flattened = []

detection_rules/devtools.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
from .endgame import EndgameSchemaManager
3434
from .eswrap import CollectEvents, add_range_to_dsl
3535
from .ghwrap import GithubClient, update_gist
36-
from .integrations import build_integrations_manifest
36+
from .integrations import (build_integrations_manifest, build_integrations_schemas, find_latest_compatible_version,
37+
load_integrations_manifests)
3738
from .main import root
3839
from .misc import PYTHON_LICENSE, add_client, client_error
3940
from .packaging import (CURRENT_RELEASE_PATH, PACKAGE_FILE, RELEASE_DIR,
@@ -1174,10 +1175,48 @@ def integrations_group():
11741175
def build_integration_manifests(overwrite: bool):
11751176
"""Builds consolidated integrations manifests file."""
11761177
click.echo("loading rules to determine all integration tags")
1178+
1179+
def flatten(tag_list: List[str]) -> List[str]:
1180+
return list(set([tag for tags in tag_list for tag in (flatten(tags) if isinstance(tags, list) else [tags])]))
1181+
11771182
rules = RuleCollection.default()
1178-
integration_tags = list(set([r.contents.metadata.integration for r in rules if r.contents.metadata.integration]))
1179-
click.echo(f"integration tags identified: {integration_tags}")
1180-
build_integrations_manifest(overwrite, integration_tags)
1183+
integration_tags = [r.contents.metadata.integration for r in rules if r.contents.metadata.integration]
1184+
unique_integration_tags = flatten(integration_tags)
1185+
click.echo(f"integration tags identified: {unique_integration_tags}")
1186+
build_integrations_manifest(overwrite, unique_integration_tags)
1187+
1188+
1189+
@integrations_group.command('build-schemas')
1190+
@click.option('--overwrite', '-o', is_flag=True, help="Overwrite the entire integrations-schema.json.gz file")
1191+
def build_integration_schemas(overwrite: bool):
1192+
"""Builds consolidated integrations schemas file."""
1193+
click.echo("Building integration schemas...")
1194+
1195+
start_time = time.perf_counter()
1196+
build_integrations_schemas(overwrite)
1197+
end_time = time.perf_counter()
1198+
click.echo(f"Time taken to generate schemas: {(end_time - start_time)/60:.2f} minutes")
1199+
1200+
1201+
@integrations_group.command('show-latest-compatible')
1202+
@click.option('--package', '-p', help='Name of package')
1203+
@click.option('--stack_version', '-s', required=True, help='Rule stack version')
1204+
def show_latest_compatible_version(package: str, stack_version: str) -> None:
1205+
"""Prints the latest integration compatible version for specified package based on stack version supplied."""
1206+
1207+
packages_manifest = None
1208+
try:
1209+
packages_manifest = load_integrations_manifests()
1210+
except Exception as e:
1211+
click.echo(f"Error loading integrations manifests: {str(e)}")
1212+
return
1213+
1214+
try:
1215+
version = find_latest_compatible_version(package, "", stack_version, packages_manifest)
1216+
click.echo(f"Compatible integration {version=}")
1217+
except Exception as e:
1218+
click.echo(f"Error finding compatible version: {str(e)}")
1219+
return
11811220

11821221

11831222
@dev_group.group('schemas')
177 Bytes
Binary file not shown.
Binary file not shown.

detection_rules/integrations.py

Lines changed: 178 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,27 @@
44
# 2.0.
55

66
"""Functions to support and interact with Kibana integrations."""
7+
import glob
78
import gzip
89
import json
9-
import os
1010
import re
1111
from collections import OrderedDict
1212
from pathlib import Path
13+
from typing import Generator, Tuple, Union
1314

1415
import requests
16+
import yaml
1517
from marshmallow import EXCLUDE, Schema, fields, post_load
1618

19+
import kql
20+
21+
from . import ecs
22+
from .beats import flatten_ecs_schema
1723
from .semver import Version
18-
from .utils import cached, get_etc_path, read_gzip
24+
from .utils import cached, get_etc_path, read_gzip, unzip
1925

2026
MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
27+
SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz'))
2128

2229

2330
@cached
@@ -26,11 +33,18 @@ def load_integrations_manifests() -> dict:
2633
return json.loads(read_gzip(get_etc_path('integration-manifests.json.gz')))
2734

2835

36+
@cached
37+
def load_integrations_schemas() -> dict:
38+
"""Load the consolidated integrations schemas."""
39+
return json.loads(read_gzip(get_etc_path('integration-schemas.json.gz')))
40+
41+
2942
class IntegrationManifestSchema(Schema):
3043
name = fields.Str(required=True)
3144
version = fields.Str(required=True)
3245
release = fields.Str(required=True)
3346
description = fields.Str(required=True)
47+
download = fields.Str(required=True)
3448
conditions = fields.Dict(required=True)
3549
policy_templates = fields.List(fields.Dict, required=True)
3650
owner = fields.Dict(required=False)
@@ -44,8 +58,8 @@ def transform_policy_template(self, data, **kwargs):
4458
def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> None:
4559
"""Builds a new local copy of manifest.yaml from integrations Github."""
4660
if overwrite:
47-
if os.path.exists(MANIFEST_FILE_PATH):
48-
os.remove(MANIFEST_FILE_PATH)
61+
if MANIFEST_FILE_PATH.exists():
62+
MANIFEST_FILE_PATH.unlink()
4963

5064
final_integration_manifests = {integration: {} for integration in rule_integrations}
5165

@@ -62,6 +76,63 @@ def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> Non
6276
print(f"final integrations manifests dumped: {MANIFEST_FILE_PATH}")
6377

6478

79+
def build_integrations_schemas(overwrite: bool) -> None:
80+
"""Builds a new local copy of integration-schemas.json.gz from EPR integrations."""
81+
82+
final_integration_schemas = {}
83+
saved_integration_schemas = {}
84+
85+
# Check if the file already exists and handle accordingly
86+
if overwrite and SCHEMA_FILE_PATH.exists():
87+
SCHEMA_FILE_PATH.unlink()
88+
elif SCHEMA_FILE_PATH.exists():
89+
saved_integration_schemas = load_integrations_schemas()
90+
91+
# Load the integration manifests
92+
integration_manifests = load_integrations_manifests()
93+
94+
# Loop through the packages and versions
95+
for package, versions in integration_manifests.items():
96+
print(f"processing {package}")
97+
final_integration_schemas.setdefault(package, {})
98+
for version, manifest in versions.items():
99+
if package in saved_integration_schemas and version in saved_integration_schemas[package]:
100+
continue
101+
102+
# Download the zip file
103+
download_url = f"https://epr.elastic.co{manifest['download']}"
104+
response = requests.get(download_url)
105+
response.raise_for_status()
106+
107+
# Update the final integration schemas
108+
final_integration_schemas[package].update({version: {}})
109+
110+
# Open the zip file
111+
with unzip(response.content) as zip_ref:
112+
for file in zip_ref.namelist():
113+
# Check if the file is a match
114+
if glob.fnmatch.fnmatch(file, '*/fields/*.yml'):
115+
integration_name = Path(file).parent.parent.name
116+
final_integration_schemas[package][version].setdefault(integration_name, {})
117+
file_data = zip_ref.read(file)
118+
schema_fields = yaml.safe_load(file_data)
119+
120+
# Parse the schema and add to the integration_manifests
121+
data = flatten_ecs_schema(schema_fields)
122+
flat_data = {field['name']: field['type'] for field in data}
123+
124+
final_integration_schemas[package][version][integration_name].update(flat_data)
125+
126+
del file_data
127+
128+
# Write the final integration schemas to disk
129+
with gzip.open(SCHEMA_FILE_PATH, "w") as schema_file:
130+
schema_file_bytes = json.dumps(final_integration_schemas).encode("utf-8")
131+
schema_file.write(schema_file_bytes)
132+
133+
print(f"final integrations manifests dumped: {SCHEMA_FILE_PATH}")
134+
135+
65136
def find_least_compatible_version(package: str, integration: str,
66137
current_stack_version: str, packages_manifest: dict) -> str:
67138
"""Finds least compatible version for specified integration based on stack version supplied."""
@@ -89,12 +160,54 @@ def find_least_compatible_version(package: str, integration: str,
89160
raise ValueError(f"no compatible version for integration {package}:{integration}")
90161

91162

163+
def find_latest_compatible_version(package: str, integration: str,
164+
rule_stack_version: str, packages_manifest: dict) -> Union[None, Tuple[str, str]]:
165+
"""Finds least compatible version for specified integration based on stack version supplied."""
166+
167+
if not package:
168+
raise ValueError("Package must be specified")
169+
170+
package_manifest = packages_manifest.get(package)
171+
if package_manifest is None:
172+
raise ValueError(f"Package {package} not found in manifest.")
173+
174+
# Converts the dict keys (version numbers) to Version objects for proper sorting (descending)
175+
integration_manifests = sorted(package_manifest.items(), key=lambda x: Version(str(x[0])), reverse=True)
176+
notice = ""
177+
178+
for version, manifest in integration_manifests:
179+
kibana_conditions = manifest.get("conditions", {}).get("kibana", {})
180+
version_requirement = kibana_conditions.get("version")
181+
if not version_requirement:
182+
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing conditions.")
183+
184+
compatible_versions = re.sub(r"\>|\<|\=|\^", "", version_requirement).split(" || ")
185+
186+
if not compatible_versions:
187+
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing compatible versions")
188+
189+
highest_compatible_version = max(compatible_versions, key=lambda x: Version(x))
190+
191+
if Version(highest_compatible_version) > Version(rule_stack_version):
192+
# generate notice message that a later integration version is available
193+
integration = f" {integration.strip()}" if integration else ""
194+
195+
notice = (f"There is a new integration {package}{integration} version {version} available!",
196+
f"Update the rule min_stack version from {rule_stack_version} to "
197+
f"{highest_compatible_version} if using new features in this latest version.")
198+
199+
elif int(highest_compatible_version[0]) == int(rule_stack_version[0]):
200+
return version, notice
201+
202+
raise ValueError(f"no compatible version for integration {package}:{integration}")
203+
204+
92205
def get_integration_manifests(integration: str) -> list:
93206
"""Iterates over specified integrations from package-storage and combines manifests per version."""
94207
epr_search_url = "https://epr.elastic.co/search"
95208

96209
# link for search parameters - https://github.com/elastic/package-registry
97-
epr_search_parameters = {"package": f"{integration}", "prerelease": "true",
210+
epr_search_parameters = {"package": f"{integration}", "prerelease": "false",
98211
"all": "true", "include_policy_templates": "true"}
99212
epr_search_response = requests.get(epr_search_url, params=epr_search_parameters)
100213
epr_search_response.raise_for_status()
@@ -106,3 +219,63 @@ def get_integration_manifests(integration: str) -> list:
106219
print(f"loaded {integration} manifests from the following package versions: "
107220
f"{[manifest['version'] for manifest in manifests]}")
108221
return manifests
222+
223+
224+
def get_integration_schema_data(data, meta, package_integrations: dict) -> Generator[dict, None, None]:
225+
"""Iterates over specified integrations from package-storage and combines schemas per version."""
226+
227+
# lazy import to avoid circular import
228+
from .rule import ( # pylint: disable=import-outside-toplevel
229+
QueryRuleData, RuleMeta
230+
)
231+
232+
data: QueryRuleData = data
233+
meta: RuleMeta = meta
234+
235+
packages_manifest = load_integrations_manifests()
236+
integrations_schemas = load_integrations_schemas()
237+
238+
# validate the query against related integration fields
239+
if isinstance(data, QueryRuleData) and data.language != 'lucene' and meta.maturity == "production":
240+
241+
# flag to only warn once per integration for available upgrades
242+
notify_update_available = True
243+
244+
for stack_version, mapping in meta.get_validation_stack_versions().items():
245+
ecs_version = mapping['ecs']
246+
endgame_version = mapping['endgame']
247+
248+
ecs_schema = ecs.flatten_multi_fields(ecs.get_schema(ecs_version, name='ecs_flat'))
249+
250+
for pk_int in package_integrations:
251+
package = pk_int["package"]
252+
integration = pk_int["integration"]
253+
254+
package_version, notice = find_latest_compatible_version(package=package,
255+
integration=integration,
256+
rule_stack_version=meta.min_stack_version,
257+
packages_manifest=packages_manifest)
258+
259+
if notify_update_available and notice and data.get("notify", False):
260+
# Notify for now, as to not lock rule stacks to integrations
261+
notify_update_available = False
262+
print(f"\n{data.get('name')}")
263+
print(*notice)
264+
265+
schema = {}
266+
if integration is None:
267+
# Use all fields from each dataset
268+
for dataset in integrations_schemas[package][package_version]:
269+
schema.update(integrations_schemas[package][package_version][dataset])
270+
else:
271+
if integration not in integrations_schemas[package][package_version]:
272+
raise ValueError(f"Integration {integration} not found in package {package} "
273+
f"version {package_version}")
274+
schema = integrations_schemas[package][package_version][integration]
275+
schema.update(ecs_schema)
276+
integration_schema = {k: kql.parser.elasticsearch_type_family(v) for k, v in schema.items()}
277+
278+
data = {"schema": integration_schema, "package": package, "integration": integration,
279+
"stack_version": stack_version, "ecs_version": ecs_version,
280+
"package_version": package_version, "endgame_version": endgame_version}
281+
yield data

detection_rules/rule.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,10 @@ def get_restricted_fields(self) -> Optional[Dict[str, tuple]]:
240240
def data_validator(self) -> Optional['DataValidator']:
241241
return DataValidator(is_elastic_rule=self.is_elastic_rule, **self.to_dict())
242242

243+
@cached_property
244+
def notify(self) -> bool:
245+
return os.environ.get('DR_NOTIFY_INTEGRATION_UPDATE_AVAILABLE') is not None
246+
243247
@cached_property
244248
def parsed_note(self) -> Optional[MarkoDocument]:
245249
dv = self.data_validator
@@ -847,7 +851,7 @@ def _add_related_integrations(self, obj: dict) -> None:
847851

848852
if self.check_restricted_field_version(field_name):
849853
if isinstance(self.data, QueryRuleData) and self.data.language != 'lucene':
850-
package_integrations = self._get_packaged_integrations(packages_manifest)
854+
package_integrations = self.get_packaged_integrations(self.data, self.metadata, packages_manifest)
851855

852856
if not package_integrations:
853857
return
@@ -947,11 +951,13 @@ def compare_field_versions(min_stack: Version, max_stack: Version) -> bool:
947951
max_stack = max_stack or current_version
948952
return Version(min_stack) <= current_version >= Version(max_stack)
949953

950-
def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[dict]]:
954+
@classmethod
955+
def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta,
956+
package_manifest: dict) -> Optional[List[dict]]:
951957
packaged_integrations = []
952958
datasets = set()
953959

954-
for node in self.data.get('ast', []):
960+
for node in data.get('ast', []):
955961
if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset':
956962
datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal)))
957963
elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset':
@@ -960,10 +966,10 @@ def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[di
960966
if not datasets:
961967
# windows and endpoint integration do not have event.dataset fields in queries
962968
# integration is None to remove duplicate references upstream in Kibana
963-
rule_integrations = self.metadata.get("integration", [])
969+
rule_integrations = meta.get("integration", [])
964970
if rule_integrations:
965971
for integration in rule_integrations:
966-
if integration in ["windows", "endpoint", "apm"]:
972+
if integration in definitions.NON_DATASET_PACKAGES:
967973
packaged_integrations.append({"package": integration, "integration": None})
968974

969975
for value in sorted(datasets):

0 commit comments

Comments
 (0)