Skip to content

Commit dd93603

Browse files
brokensound77github-actions[bot]
authored andcommitted
Cleanup rule survey code (#1923)
* Cleanup rule survey code * default to only unique-ing on process name for lucene rules * fix bug in kibana url parsing by removing redundant port from domain * update search-alerts columns and nest fields * fix rule.contents.data.index Co-authored-by: Mika Ayenson <[email protected]> (cherry picked from commit 332ea40)
1 parent 5a6d953 commit dd93603

File tree

9 files changed

+127
-89
lines changed

9 files changed

+127
-89
lines changed

detection_rules/devtools.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,7 @@ def rule_survey(ctx: click.Context, query, date_range, dump_file, hide_zero_coun
10241024
"""Survey rule counts."""
10251025
from kibana.resources import Signal
10261026
from .main import search_rules
1027+
# from .eswrap import parse_unique_field_results
10271028

10281029
survey_results = []
10291030
start_time, end_time = date_range
@@ -1039,15 +1040,20 @@ def rule_survey(ctx: click.Context, query, date_range, dump_file, hide_zero_coun
10391040
click.echo(f'Saving detailed dump to: {dump_file}')
10401041

10411042
collector = CollectEvents(elasticsearch_client)
1042-
details = collector.search_from_rule(*rules, start_time=start_time, end_time=end_time)
1043-
counts = collector.count_from_rule(*rules, start_time=start_time, end_time=end_time)
1043+
details = collector.search_from_rule(rules, start_time=start_time, end_time=end_time)
1044+
counts = collector.count_from_rule(rules, start_time=start_time, end_time=end_time)
10441045

10451046
# add alerts
10461047
with kibana_client:
10471048
range_dsl = {'query': {'bool': {'filter': []}}}
10481049
add_range_to_dsl(range_dsl['query']['bool']['filter'], start_time, end_time)
10491050
alerts = {a['_source']['signal']['rule']['rule_id']: a['_source']
1050-
for a in Signal.search(range_dsl)['hits']['hits']}
1051+
for a in Signal.search(range_dsl, size=10000)['hits']['hits']}
1052+
1053+
# for alert in alerts:
1054+
# rule_id = alert['signal']['rule']['rule_id']
1055+
# rule = rules.id_map[rule_id]
1056+
# unique_results = parse_unique_field_results(rule.contents.data.type, rule.contents.data.unique_fields, alert)
10511057

10521058
for rule_id, count in counts.items():
10531059
alert_count = len(alerts.get(rule_id, []))

detection_rules/ecs.py

-15
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,6 @@ def add_field(schema, name, info):
3535
add_field(schema, remaining, info)
3636

3737

38-
def nest_from_dot(dots, value):
39-
"""Nest a dotted field and set the inner most value."""
40-
fields = dots.split('.')
41-
42-
if not fields:
43-
return {}
44-
45-
nested = {fields.pop(): value}
46-
47-
for field in reversed(fields):
48-
nested = {field: nested}
49-
50-
return nested
51-
52-
5338
def _recursive_merge(existing, new, depth=0):
5439
"""Return an existing dict merged into a new one."""
5540
for key, value in existing.items():

detection_rules/eswrap.py

+63-51
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import os
99
import time
1010
from collections import defaultdict
11-
from typing import Union
11+
from typing import List, Union
1212

1313
import click
1414
import elasticsearch
@@ -17,7 +17,7 @@
1717

1818
import kql
1919
from .main import root
20-
from .misc import add_params, client_error, elasticsearch_options, get_elasticsearch_client
20+
from .misc import add_params, client_error, elasticsearch_options, get_elasticsearch_client, nested_get
2121
from .rule import TOMLRule
2222
from .rule_loader import rta_mappings, RuleCollection
2323
from .utils import format_command_options, normalize_timing_and_sort, unix_time_to_formatted, get_path
@@ -33,7 +33,23 @@ def add_range_to_dsl(dsl_filter, start_time, end_time='now'):
3333
)
3434

3535

36-
class RtaEvents(object):
36+
def parse_unique_field_results(rule_type: str, unique_fields: List[str], search_results: dict):
37+
parsed_results = defaultdict(lambda: defaultdict(int))
38+
hits = search_results['hits']
39+
hits = hits['hits'] if rule_type != 'eql' else hits.get('events') or hits.get('sequences', [])
40+
for hit in hits:
41+
for field in unique_fields:
42+
match = nested_get(hit['_source'], field)
43+
if not match:
44+
continue
45+
46+
match = ','.join(sorted(match)) if isinstance(match, list) else match
47+
parsed_results[field][match] += 1
48+
# if rule.type == eql, structure is different
49+
return {'results': parsed_results} if parsed_results else {}
50+
51+
52+
class RtaEvents:
3753
"""Events collected from Elasticsearch."""
3854

3955
def __init__(self, events):
@@ -64,7 +80,7 @@ def evaluate_against_rule_and_update_mapping(self, rule_id, rta_name, verbose=Tr
6480
"""Evaluate a rule against collected events and update mapping."""
6581
from .utils import combine_sources, evaluate
6682

67-
rule = next((rule for rule in RuleCollection.default() if rule.id == rule_id), None)
83+
rule = RuleCollection.default().id_map.get(rule_id)
6884
assert rule is not None, f"Unable to find rule with ID {rule_id}"
6985
merged_events = combine_sources(*self.events.values())
7086
filtered = evaluate(rule, merged_events)
@@ -112,7 +128,7 @@ def _build_timestamp_map(self, index_str):
112128

113129
def _get_last_event_time(self, index_str, dsl=None):
114130
"""Get timestamp of most recent event."""
115-
last_event = self.client.search(dsl, index_str, size=1, sort='@timestamp:desc')['hits']['hits']
131+
last_event = self.client.search(query=dsl, index=index_str, size=1, sort='@timestamp:desc')['hits']['hits']
116132
if not last_event:
117133
return
118134

@@ -146,7 +162,7 @@ def _prep_query(query, language, index, start_time=None, end_time=None):
146162
elif language == 'dsl':
147163
formatted_dsl = {'query': query}
148164
else:
149-
raise ValueError('Unknown search language')
165+
raise ValueError(f'Unknown search language: {language}')
150166

151167
if start_time or end_time:
152168
end_time = end_time or 'now'
@@ -172,84 +188,78 @@ def search(self, query, language, index: Union[str, list] = '*', start_time=None
172188

173189
return results
174190

175-
def search_from_rule(self, *rules: TOMLRule, start_time=None, end_time='now', size=None):
191+
def search_from_rule(self, rules: RuleCollection, start_time=None, end_time='now', size=None):
176192
"""Search an elasticsearch instance using a rule."""
177-
from .misc import nested_get
178-
179193
async_client = AsyncSearchClient(self.client)
180194
survey_results = {}
181-
182-
def parse_unique_field_results(rule_type, unique_fields, search_results):
183-
parsed_results = defaultdict(lambda: defaultdict(int))
184-
hits = search_results['hits']
185-
hits = hits['hits'] if rule_type != 'eql' else hits.get('events') or hits.get('sequences', [])
186-
for hit in hits:
187-
for field in unique_fields:
188-
match = nested_get(hit['_source'], field)
189-
match = ','.join(sorted(match)) if isinstance(match, list) else match
190-
parsed_results[field][match] += 1
191-
# if rule.type == eql, structure is different
192-
return {'results': parsed_results} if parsed_results else {}
193-
194195
multi_search = []
195196
multi_search_rules = []
196-
async_searches = {}
197-
eql_searches = {}
197+
async_searches = []
198+
eql_searches = []
198199

199200
for rule in rules:
200-
if not rule.query:
201+
if not rule.contents.data.get('query'):
201202
continue
202203

203-
index_str, formatted_dsl, lucene_query = self._prep_query(query=rule.query,
204-
language=rule.contents.get('language'),
205-
index=rule.contents.get('index', '*'),
204+
language = rule.contents.data.get('language')
205+
query = rule.contents.data.query
206+
rule_type = rule.contents.data.type
207+
index_str, formatted_dsl, lucene_query = self._prep_query(query=query,
208+
language=language,
209+
index=rule.contents.data.get('index', '*'),
206210
start_time=start_time,
207211
end_time=end_time)
208212
formatted_dsl.update(size=size or self.max_events)
209213

210214
# prep for searches: msearch for kql | async search for lucene | eql client search for eql
211-
if rule.contents['language'] == 'kuery':
215+
if language == 'kuery':
212216
multi_search_rules.append(rule)
213-
multi_search.append(json.dumps(
214-
{'index': index_str, 'allow_no_indices': 'true', 'ignore_unavailable': 'true'}))
215-
multi_search.append(json.dumps(formatted_dsl))
216-
elif rule.contents['language'] == 'lucene':
217+
multi_search.append({'index': index_str, 'allow_no_indices': 'true', 'ignore_unavailable': 'true'})
218+
multi_search.append(formatted_dsl)
219+
elif language == 'lucene':
217220
# wait for 0 to try and force async with no immediate results (not guaranteed)
218-
result = async_client.submit(body=formatted_dsl, q=rule.query, index=index_str,
221+
result = async_client.submit(body=formatted_dsl, q=query, index=index_str,
219222
allow_no_indices=True, ignore_unavailable=True,
220223
wait_for_completion_timeout=0)
221224
if result['is_running'] is True:
222-
async_searches[rule] = result['id']
225+
async_searches.append((rule, result['id']))
223226
else:
224-
survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields,
227+
survey_results[rule.id] = parse_unique_field_results(rule_type, ['process.name'],
225228
result['response'])
226-
elif rule.contents['language'] == 'eql':
229+
elif language == 'eql':
227230
eql_body = {
228231
'index': index_str,
229232
'params': {'ignore_unavailable': 'true', 'allow_no_indices': 'true'},
230-
'body': {'query': rule.query, 'filter': formatted_dsl['filter']}
233+
'body': {'query': query, 'filter': formatted_dsl['filter']}
231234
}
232-
eql_searches[rule] = eql_body
235+
eql_searches.append((rule, eql_body))
233236

234237
# assemble search results
235-
multi_search_results = self.client.msearch('\n'.join(multi_search) + '\n')
238+
multi_search_results = self.client.msearch(searches=multi_search)
236239
for index, result in enumerate(multi_search_results['responses']):
237240
try:
238241
rule = multi_search_rules[index]
239-
survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result)
242+
survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type,
243+
rule.contents.data.unique_fields, result)
240244
except KeyError:
241245
survey_results[multi_search_rules[index].id] = {'error_retrieving_results': True}
242246

243-
for rule, search_args in eql_searches.items():
247+
for entry in eql_searches:
248+
rule: TOMLRule
249+
search_args: dict
250+
rule, search_args = entry
244251
try:
245252
result = self.client.eql.search(**search_args)
246-
survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result)
253+
survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type,
254+
rule.contents.data.unique_fields, result)
247255
except (elasticsearch.NotFoundError, elasticsearch.RequestError) as e:
248256
survey_results[rule.id] = {'error_retrieving_results': True, 'error': e.info['error']['reason']}
249257

250-
for rule, async_id in async_searches.items():
251-
result = async_client.get(async_id)['response']
252-
survey_results[rule.id] = parse_unique_field_results(rule.type, rule.unique_fields, result)
258+
for entry in async_searches:
259+
rule: TOMLRule
260+
rule, async_id = entry
261+
result = async_client.get(id=async_id)['response']
262+
survey_results[rule.id] = parse_unique_field_results(rule.contents.data.type, ['process.name'], result)
253263

254264
return survey_results
255265

@@ -267,19 +277,21 @@ def count(self, query, language, index: Union[str, list], start_time=None, end_t
267277
return self.client.count(body=formatted_dsl, index=index_str, q=lucene_query, allow_no_indices=True,
268278
ignore_unavailable=True)['count']
269279

270-
def count_from_rule(self, *rules, start_time=None, end_time='now'):
280+
def count_from_rule(self, rules: RuleCollection, start_time=None, end_time='now'):
271281
"""Get a count of documents from elasticsearch using a rule."""
272282
survey_results = {}
273283

274-
for rule in rules:
284+
for rule in rules.rules:
275285
rule_results = {'rule_id': rule.id, 'name': rule.name}
276286

277-
if not rule.query:
287+
if not rule.contents.data.get('query'):
278288
continue
279289

280290
try:
281-
rule_results['search_count'] = self.count(query=rule.query, language=rule.contents.get('language'),
282-
index=rule.contents.get('index', '*'), start_time=start_time,
291+
rule_results['search_count'] = self.count(query=rule.contents.data.query,
292+
language=rule.contents.data.language,
293+
index=rule.contents.data.get('index', '*'),
294+
start_time=start_time,
283295
end_time=end_time)
284296
except (elasticsearch.NotFoundError, elasticsearch.RequestError):
285297
rule_results['search_count'] = -1

detection_rules/kbwrap.py

+27-11
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from kibana import Signal, RuleResource
1313
from .cli_utils import multi_collection
1414
from .main import root
15-
from .misc import add_params, client_error, kibana_options, get_kibana_client
15+
from .misc import add_params, client_error, kibana_options, get_kibana_client, nested_set
1616
from .schemas import downgrade
1717
from .utils import format_command_options
1818

@@ -82,8 +82,9 @@ def upload_rule(ctx, rules, replace_id):
8282
@click.option('--date-range', '-d', type=(str, str), default=('now-7d', 'now'), help='Date range to scope search')
8383
@click.option('--columns', '-c', multiple=True, help='Columns to display in table')
8484
@click.option('--extend', '-e', is_flag=True, help='If columns are specified, extend the original columns')
85+
@click.option('--max-count', '-m', default=100, help='The max number of alerts to return')
8586
@click.pass_context
86-
def search_alerts(ctx, query, date_range, columns, extend):
87+
def search_alerts(ctx, query, date_range, columns, extend, max_count):
8788
"""Search detection engine alerts with KQL."""
8889
from eql.table import Table
8990
from .eswrap import MATCH_ALL, add_range_to_dsl
@@ -94,15 +95,30 @@ def search_alerts(ctx, query, date_range, columns, extend):
9495
add_range_to_dsl(kql_query['bool'].setdefault('filter', []), start_time, end_time)
9596

9697
with kibana:
97-
alerts = [a['_source'] for a in Signal.search({'query': kql_query})['hits']['hits']]
98-
99-
table_columns = ['host.hostname', 'rule.name', '@timestamp']
98+
alerts = [a['_source'] for a in Signal.search({'query': kql_query}, size=max_count)['hits']['hits']]
10099

101100
# check for events with nested signal fields
102-
if alerts and 'signal' in alerts[0]:
103-
table_columns = ['host.hostname', 'signal.rule.name', 'signal.status', 'signal.original_time']
104-
if columns:
105-
columns = list(columns)
106-
table_columns = table_columns + columns if extend else columns
107-
click.echo(Table.from_list(table_columns, alerts))
101+
if alerts:
102+
table_columns = ['host.hostname']
103+
104+
if 'signal' in alerts[0]:
105+
table_columns += ['signal.rule.name', 'signal.status', 'signal.original_time']
106+
elif 'kibana.alert.rule.name' in alerts[0]:
107+
table_columns += ['kibana.alert.rule.name', 'kibana.alert.status', 'kibana.alert.original_time']
108+
else:
109+
table_columns += ['rule.name', '@timestamp']
110+
if columns:
111+
columns = list(columns)
112+
table_columns = table_columns + columns if extend else columns
113+
114+
# Table requires the data to be nested, but depending on the version, some data uses dotted keys, so
115+
# they must be nested explicitly
116+
for alert in alerts:
117+
for key in table_columns:
118+
if key in alert:
119+
nested_set(alert, key, alert[key])
120+
121+
click.echo(Table.from_list(table_columns, alerts))
122+
else:
123+
click.echo('No alerts detected')
108124
return alerts

detection_rules/mappings.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
RTA_DIR = get_path("rta")
1414

1515

16-
class RtaMappings(object):
16+
class RtaMappings:
1717
"""Rta-mapping helper class."""
1818

1919
def __init__(self):
2020
"""Rta-mapping validation and prep."""
21-
self.mapping = load_etc_dump('rule-mapping.yml') # type: dict
21+
self.mapping: dict = load_etc_dump('rule-mapping.yml')
2222
self.validate()
2323

2424
self._rta_mapping = defaultdict(list)

detection_rules/misc.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def nested_get(_dict, dot_key, default=None):
8989

9090

9191
def nested_set(_dict, dot_key, value):
92-
"""Set a nested field from a a key in dot notation."""
92+
"""Set a nested field from a key in dot notation."""
9393
keys = dot_key.split('.')
9494
for key in keys[:-1]:
9595
_dict = _dict.setdefault(key, {})
@@ -100,6 +100,21 @@ def nested_set(_dict, dot_key, value):
100100
raise ValueError('dict cannot set a value to a non-dict for {}'.format(dot_key))
101101

102102

103+
def nest_from_dot(dots, value):
104+
"""Nest a dotted field and set the innermost value."""
105+
fields = dots.split('.')
106+
107+
if not fields:
108+
return {}
109+
110+
nested = {fields.pop(): value}
111+
112+
for field in reversed(fields):
113+
nested = {field: nested}
114+
115+
return nested
116+
117+
103118
def schema_prompt(name, value=None, required=False, **options):
104119
"""Interactively prompt based on schema requirements."""
105120
name = str(name)

detection_rules/rule.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ class QueryValidator:
338338

339339
@property
340340
def ast(self) -> Any:
341-
raise NotImplementedError
341+
raise NotImplementedError()
342342

343343
@property
344344
def unique_fields(self) -> Any:

kibana/connector.py

+3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=No
4343
self.domain, self.es_uuid, self.kibana_uuid = \
4444
base64.b64decode(cloud_info.encode("utf-8")).decode("utf-8").split("$")
4545

46+
if self.domain.endswith(':443'):
47+
self.domain = self.domain[:-4]
48+
4649
kibana_url_from_cloud = f"https://{self.kibana_uuid}.{self.domain}:9243"
4750
if self.kibana_url and self.kibana_url != kibana_url_from_cloud:
4851
raise ValueError(f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id '

0 commit comments

Comments
 (0)