8
8
import os
9
9
import time
10
10
from collections import defaultdict
11
- from typing import Union
11
+ from typing import List , Union
12
12
13
13
import click
14
14
import elasticsearch
17
17
18
18
import kql
19
19
from .main import root
20
- from .misc import add_params , client_error , elasticsearch_options , get_elasticsearch_client
20
+ from .misc import add_params , client_error , elasticsearch_options , get_elasticsearch_client , nested_get
21
21
from .rule import TOMLRule
22
22
from .rule_loader import rta_mappings , RuleCollection
23
23
from .utils import format_command_options , normalize_timing_and_sort , unix_time_to_formatted , get_path
@@ -33,7 +33,23 @@ def add_range_to_dsl(dsl_filter, start_time, end_time='now'):
33
33
)
34
34
35
35
36
- class RtaEvents (object ):
36
+ def parse_unique_field_results (rule_type : str , unique_fields : List [str ], search_results : dict ):
37
+ parsed_results = defaultdict (lambda : defaultdict (int ))
38
+ hits = search_results ['hits' ]
39
+ hits = hits ['hits' ] if rule_type != 'eql' else hits .get ('events' ) or hits .get ('sequences' , [])
40
+ for hit in hits :
41
+ for field in unique_fields :
42
+ match = nested_get (hit ['_source' ], field )
43
+ if not match :
44
+ continue
45
+
46
+ match = ',' .join (sorted (match )) if isinstance (match , list ) else match
47
+ parsed_results [field ][match ] += 1
48
+ # if rule.type == eql, structure is different
49
+ return {'results' : parsed_results } if parsed_results else {}
50
+
51
+
52
+ class RtaEvents :
37
53
"""Events collected from Elasticsearch."""
38
54
39
55
def __init__ (self , events ):
@@ -64,7 +80,7 @@ def evaluate_against_rule_and_update_mapping(self, rule_id, rta_name, verbose=Tr
64
80
"""Evaluate a rule against collected events and update mapping."""
65
81
from .utils import combine_sources , evaluate
66
82
67
- rule = next (( rule for rule in RuleCollection .default () if rule . id == rule_id ), None )
83
+ rule = RuleCollection .default (). id_map . get ( rule_id )
68
84
assert rule is not None , f"Unable to find rule with ID { rule_id } "
69
85
merged_events = combine_sources (* self .events .values ())
70
86
filtered = evaluate (rule , merged_events )
@@ -112,7 +128,7 @@ def _build_timestamp_map(self, index_str):
112
128
113
129
def _get_last_event_time (self , index_str , dsl = None ):
114
130
"""Get timestamp of most recent event."""
115
- last_event = self .client .search (dsl , index_str , size = 1 , sort = '@timestamp:desc' )['hits' ]['hits' ]
131
+ last_event = self .client .search (query = dsl , index = index_str , size = 1 , sort = '@timestamp:desc' )['hits' ]['hits' ]
116
132
if not last_event :
117
133
return
118
134
@@ -146,7 +162,7 @@ def _prep_query(query, language, index, start_time=None, end_time=None):
146
162
elif language == 'dsl' :
147
163
formatted_dsl = {'query' : query }
148
164
else :
149
- raise ValueError ('Unknown search language' )
165
+ raise ValueError (f 'Unknown search language: { language } ' )
150
166
151
167
if start_time or end_time :
152
168
end_time = end_time or 'now'
@@ -172,84 +188,78 @@ def search(self, query, language, index: Union[str, list] = '*', start_time=None
172
188
173
189
return results
174
190
175
- def search_from_rule (self , * rules : TOMLRule , start_time = None , end_time = 'now' , size = None ):
191
+ def search_from_rule (self , rules : RuleCollection , start_time = None , end_time = 'now' , size = None ):
176
192
"""Search an elasticsearch instance using a rule."""
177
- from .misc import nested_get
178
-
179
193
async_client = AsyncSearchClient (self .client )
180
194
survey_results = {}
181
-
182
- def parse_unique_field_results (rule_type , unique_fields , search_results ):
183
- parsed_results = defaultdict (lambda : defaultdict (int ))
184
- hits = search_results ['hits' ]
185
- hits = hits ['hits' ] if rule_type != 'eql' else hits .get ('events' ) or hits .get ('sequences' , [])
186
- for hit in hits :
187
- for field in unique_fields :
188
- match = nested_get (hit ['_source' ], field )
189
- match = ',' .join (sorted (match )) if isinstance (match , list ) else match
190
- parsed_results [field ][match ] += 1
191
- # if rule.type == eql, structure is different
192
- return {'results' : parsed_results } if parsed_results else {}
193
-
194
195
multi_search = []
195
196
multi_search_rules = []
196
- async_searches = {}
197
- eql_searches = {}
197
+ async_searches = []
198
+ eql_searches = []
198
199
199
200
for rule in rules :
200
- if not rule .query :
201
+ if not rule .contents . data . get ( ' query' ) :
201
202
continue
202
203
203
- index_str , formatted_dsl , lucene_query = self ._prep_query (query = rule .query ,
204
- language = rule .contents .get ('language' ),
205
- index = rule .contents .get ('index' , '*' ),
204
+ language = rule .contents .data .get ('language' )
205
+ query = rule .contents .data .query
206
+ rule_type = rule .contents .data .type
207
+ index_str , formatted_dsl , lucene_query = self ._prep_query (query = query ,
208
+ language = language ,
209
+ index = rule .contents .data .get ('index' , '*' ),
206
210
start_time = start_time ,
207
211
end_time = end_time )
208
212
formatted_dsl .update (size = size or self .max_events )
209
213
210
214
# prep for searches: msearch for kql | async search for lucene | eql client search for eql
211
- if rule . contents [ ' language' ] == 'kuery' :
215
+ if language == 'kuery' :
212
216
multi_search_rules .append (rule )
213
- multi_search .append (json .dumps (
214
- {'index' : index_str , 'allow_no_indices' : 'true' , 'ignore_unavailable' : 'true' }))
215
- multi_search .append (json .dumps (formatted_dsl ))
216
- elif rule .contents ['language' ] == 'lucene' :
217
+ multi_search .append ({'index' : index_str , 'allow_no_indices' : 'true' , 'ignore_unavailable' : 'true' })
218
+ multi_search .append (formatted_dsl )
219
+ elif language == 'lucene' :
217
220
# wait for 0 to try and force async with no immediate results (not guaranteed)
218
- result = async_client .submit (body = formatted_dsl , q = rule . query , index = index_str ,
221
+ result = async_client .submit (body = formatted_dsl , q = query , index = index_str ,
219
222
allow_no_indices = True , ignore_unavailable = True ,
220
223
wait_for_completion_timeout = 0 )
221
224
if result ['is_running' ] is True :
222
- async_searches [ rule ] = result ['id' ]
225
+ async_searches . append (( rule , result ['id' ]))
223
226
else :
224
- survey_results [rule .id ] = parse_unique_field_results (rule . type , rule . unique_fields ,
227
+ survey_results [rule .id ] = parse_unique_field_results (rule_type , [ 'process.name' ] ,
225
228
result ['response' ])
226
- elif rule . contents [ ' language' ] == 'eql' :
229
+ elif language == 'eql' :
227
230
eql_body = {
228
231
'index' : index_str ,
229
232
'params' : {'ignore_unavailable' : 'true' , 'allow_no_indices' : 'true' },
230
- 'body' : {'query' : rule . query , 'filter' : formatted_dsl ['filter' ]}
233
+ 'body' : {'query' : query , 'filter' : formatted_dsl ['filter' ]}
231
234
}
232
- eql_searches [ rule ] = eql_body
235
+ eql_searches . append (( rule , eql_body ))
233
236
234
237
# assemble search results
235
- multi_search_results = self .client .msearch (' \n ' . join ( multi_search ) + ' \n ' )
238
+ multi_search_results = self .client .msearch (searches = multi_search )
236
239
for index , result in enumerate (multi_search_results ['responses' ]):
237
240
try :
238
241
rule = multi_search_rules [index ]
239
- survey_results [rule .id ] = parse_unique_field_results (rule .type , rule .unique_fields , result )
242
+ survey_results [rule .id ] = parse_unique_field_results (rule .contents .data .type ,
243
+ rule .contents .data .unique_fields , result )
240
244
except KeyError :
241
245
survey_results [multi_search_rules [index ].id ] = {'error_retrieving_results' : True }
242
246
243
- for rule , search_args in eql_searches .items ():
247
+ for entry in eql_searches :
248
+ rule : TOMLRule
249
+ search_args : dict
250
+ rule , search_args = entry
244
251
try :
245
252
result = self .client .eql .search (** search_args )
246
- survey_results [rule .id ] = parse_unique_field_results (rule .type , rule .unique_fields , result )
253
+ survey_results [rule .id ] = parse_unique_field_results (rule .contents .data .type ,
254
+ rule .contents .data .unique_fields , result )
247
255
except (elasticsearch .NotFoundError , elasticsearch .RequestError ) as e :
248
256
survey_results [rule .id ] = {'error_retrieving_results' : True , 'error' : e .info ['error' ]['reason' ]}
249
257
250
- for rule , async_id in async_searches .items ():
251
- result = async_client .get (async_id )['response' ]
252
- survey_results [rule .id ] = parse_unique_field_results (rule .type , rule .unique_fields , result )
258
+ for entry in async_searches :
259
+ rule : TOMLRule
260
+ rule , async_id = entry
261
+ result = async_client .get (id = async_id )['response' ]
262
+ survey_results [rule .id ] = parse_unique_field_results (rule .contents .data .type , ['process.name' ], result )
253
263
254
264
return survey_results
255
265
@@ -267,19 +277,21 @@ def count(self, query, language, index: Union[str, list], start_time=None, end_t
267
277
return self .client .count (body = formatted_dsl , index = index_str , q = lucene_query , allow_no_indices = True ,
268
278
ignore_unavailable = True )['count' ]
269
279
270
- def count_from_rule (self , * rules , start_time = None , end_time = 'now' ):
280
+ def count_from_rule (self , rules : RuleCollection , start_time = None , end_time = 'now' ):
271
281
"""Get a count of documents from elasticsearch using a rule."""
272
282
survey_results = {}
273
283
274
- for rule in rules :
284
+ for rule in rules . rules :
275
285
rule_results = {'rule_id' : rule .id , 'name' : rule .name }
276
286
277
- if not rule .query :
287
+ if not rule .contents . data . get ( ' query' ) :
278
288
continue
279
289
280
290
try :
281
- rule_results ['search_count' ] = self .count (query = rule .query , language = rule .contents .get ('language' ),
282
- index = rule .contents .get ('index' , '*' ), start_time = start_time ,
291
+ rule_results ['search_count' ] = self .count (query = rule .contents .data .query ,
292
+ language = rule .contents .data .language ,
293
+ index = rule .contents .data .get ('index' , '*' ),
294
+ start_time = start_time ,
283
295
end_time = end_time )
284
296
except (elasticsearch .NotFoundError , elasticsearch .RequestError ):
285
297
rule_results ['search_count' ] = - 1
0 commit comments