Skip to content

Add tests to ensure rules are properly deprecated #1050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions detection_rules/devtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,32 @@ def add_github_meta(this_rule, status, original_rule_id=None):
ctx.invoke(search_rules, query=query, columns=columns, language=language, rules=all_rules, pager=True)


@dev_group.command('deprecate-rule')
@click.argument('rule-file', type=click.Path(dir_okay=False))
@click.pass_context
def deprecate_rule(ctx: click.Context, rule_file: str):
"""Deprecate a rule."""
import pytoml
from .packaging import load_versions

version_info = load_versions()
rule_file = Path(rule_file)
contents = pytoml.loads(rule_file.read_text())
rule = Rule(path=rule_file, contents=contents)

if rule.id not in version_info:
click.echo('Rule has not been version locked and so does not need to be deprecated. '
'Delete the file or update the maturity to `development` instead')
ctx.exit()

today = time.strftime('%Y/%m/%d')
rule.metadata.update(updated_date=today, deprecation_date=today, maturity='deprecated')
deprecated_path = get_path('rules', '_deprecated', rule_file.name)
rule.save(new_path=deprecated_path, as_rule=True)
rule_file.unlink()
click.echo(f'Rule moved to {deprecated_path} - remember to git add this file')


@dev_group.group('test')
def test_group():
"""Commands for testing against stack resources."""
Expand Down
17 changes: 10 additions & 7 deletions detection_rules/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from . import rule_loader
from .misc import JS_LICENSE, cached
from .rule import Rule, downgrade_contents_from_rule # noqa: F401
from .schemas import CurrentSchema
from .utils import Ndjson, get_path, get_etc_path, load_etc_dump, save_etc_dump

RELEASE_DIR = get_path("releases")
Expand All @@ -28,7 +29,7 @@
# CHANGELOG_FILE = Path(get_etc_path('rules-changelog.json'))


def filter_rule(rule: Rule, config_filter: dict, exclude_fields: dict) -> bool:
def filter_rule(rule: Rule, config_filter: dict, exclude_fields: Optional[dict] = None) -> bool:
"""Filter a rule based off metadata and a package configuration."""
flat_rule = rule.flattened_contents
for key, values in config_filter.items():
Expand All @@ -46,6 +47,7 @@ def filter_rule(rule: Rule, config_filter: dict, exclude_fields: dict) -> bool:
if len(rule_values & values) == 0:
return False

exclude_fields = exclude_fields or {}
for index, fields in exclude_fields.items():
if rule.unique_fields and (rule.contents['index'] == index or index == 'any'):
if set(rule.unique_fields) & set(fields):
Expand All @@ -66,8 +68,9 @@ def load_versions(current_versions: dict = None):
return current_versions or load_etc_dump('version.lock.json')


def manage_versions(rules: list, deprecated_rules: list = None, current_versions: dict = None,
exclude_version_update=False, add_new=True, save_changes=False, verbose=True) -> (list, list, list):
def manage_versions(rules: List[Rule], deprecated_rules: list = None, current_versions: dict = None,
exclude_version_update=False, add_new=True, save_changes=False,
verbose=True) -> (List[str], List[str], List[str]):
"""Update the contents of the version.lock file and optionally save changes."""
new_rules = {}
changed_rules = []
Expand Down Expand Up @@ -103,13 +106,12 @@ def manage_versions(rules: list, deprecated_rules: list = None, current_versions
if deprecated_rules:
rule_deprecations = load_etc_dump('deprecated_rules.json')

deprecation_date = str(datetime.date.today())

for rule in deprecated_rules:
if rule.id not in rule_deprecations:
rule_deprecations[rule.id] = {
'rule_name': rule.name,
'deprecation_date': deprecation_date
'deprecation_date': rule.metadata['deprecation_date'],
'stack_version': CurrentSchema.STACK_VERSION
}
newly_deprecated.append(rule.id)

Expand All @@ -129,7 +131,8 @@ def manage_versions(rules: list, deprecated_rules: list = None, current_versions
click.echo('Updated version.lock.json file')

if newly_deprecated:
save_etc_dump(sorted(OrderedDict(rule_deprecations)), 'deprecated_rules.json')
save_etc_dump(OrderedDict(sorted(rule_deprecations.items(), key=lambda e: e[1]['rule_name'])),
'deprecated_rules.json')

if verbose:
click.echo('Updated deprecated_rules.json file')
Expand Down
9 changes: 7 additions & 2 deletions detection_rules/rule_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,14 @@ def filter_rules(rules, metadata_field, value):
return [rule for rule in rules if rule.metadata.get(metadata_field, '') == value]


def get_production_rules(verbose=False):
def get_production_rules(verbose=False, include_deprecated=False) -> List[Rule]:
"""Get rules with a maturity of production."""
return filter_rules(load_rules(verbose=verbose).values(), 'maturity', 'production')
from .packaging import filter_rule

maturity = ['production']
if include_deprecated:
maturity.append('deprecated')
return [rule for rule in load_rules(verbose=verbose).values() if filter_rule(rule, {'maturity': maturity})]


@cached
Expand Down
1 change: 1 addition & 0 deletions detection_rules/schemas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TomlMetadata(GenericSchema):
# rule validated against each ecs schema contained
beats_version = jsl.StringField(pattern=VERSION_PATTERN, required=False)
comments = jsl.StringField(required=False)
deprecation_date = jsl.StringField(required=False, pattern=DATE_PATTERN, default=time.strftime('%Y/%m/%d'))
ecs_versions = jsl.ArrayField(jsl.StringField(pattern=BRANCH_PATTERN, required=True), required=False)
maturity = jsl.StringField(enum=MATURITY_LEVELS, default='development', required=True)

Expand Down
23 changes: 23 additions & 0 deletions docs/deprecating.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Deprecating rules

Rules that have been version locked (added to [version.lock.json](../etc/version.lock.json)), which also means they
have been added to the detection engine in Kibana, must be properly [deprecated](#steps-to-properly-deprecate-a-rule).

If a rule was never version locked (not yet pushed to Kibana or still in non-`production` `maturity`), the rule can
simply be removed with no additional changes, or updated the `maturity = "development"`, which will leave it out of the
release package to Kibana.


## Steps to properly deprecate a rule

1. Update the `maturity` to `deprecated`
2. Move the rule file to [rules/_deprecated](../rules/_deprecated)
3. Add `deprecation_date` and update `updated_date` to match

Next time the versions are locked, the rule will be added to the [deprecated_rules.json](../etc/deprecated_rules.json)
file.


### Using the deprecate-rule command

Alternatively, you can run `python -m detection_rules dev deprecate-rule <rule-file>`, which will perform all the steps
8 changes: 7 additions & 1 deletion etc/deprecated_rules.json
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
{}
{
"3a86e085-094c-412d-97ff-2439731e59cb": {
"deprecation_date": "2021-03-03",
"rule_name": "Setgid Bit Set via chmod",
"stack_version": "7.13"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the name, it's not clear whether this was the first version where this was removed, or the last version this was included

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I think I can better handle this when expanding for #958, so I will punt this to that

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[metadata]
creation_date = "2020/04/23"
deprecation_date = "2021/03/16"
maturity = "deprecated"
updated_date = "2021/03/16"

[rule]
author = ["Elastic"]
description = """
An adversary may add the setgid bit to a file or directory in order to run a file with the privileges of the owning
group. An adversary can take advantage of this to either do a shell escape or exploit a vulnerability in an application
with the setgid bit to get code running in a different user’s context. Additionally, adversaries can use this mechanism
on their own malware to make sure they're able to execute in elevated contexts in the future.
"""
from = "now-9m"
index = ["auditbeat-*", "logs-endpoint.events.*"]
language = "lucene"
license = "Elastic License"
max_signals = 33
name = "Setgid Bit Set via chmod"
risk_score = 21
rule_id = "3a86e085-094c-412d-97ff-2439731e59cb"
severity = "low"
tags = ["Elastic", "Host", "Linux", "Threat Detection", "Privilege Escalation"]
timestamp_override = "event.ingested"
type = "query"

query = '''
event.category:process AND event.type:(start or process_started) AND process.name:chmod AND process.args:(g+s OR /2[0-9]{3}/) AND NOT user.name:root
'''


[[rule.threat]]
framework = "MITRE ATT&CK"
[[rule.threat.technique]]
id = "T1548"
name = "Abuse Elevation Control Mechanism"
reference = "https://attack.mitre.org/techniques/T1548/"
[[rule.threat.technique.subtechnique]]
id = "T1548.001"
name = "Setuid and Setgid"
reference = "https://attack.mitre.org/techniques/T1548/001/"



[rule.threat.tactic]
id = "TA0004"
name = "Privilege Escalation"
reference = "https://attack.mitre.org/tactics/TA0004/"
[[rule.threat]]
framework = "MITRE ATT&CK"

[rule.threat.tactic]
id = "TA0003"
name = "Persistence"
reference = "https://attack.mitre.org/tactics/TA0003/"
53 changes: 51 additions & 2 deletions tests/test_all_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
from rta import get_ttp_names

from detection_rules import attack, beats, ecs
from detection_rules.packaging import load_versions
from detection_rules.rule_loader import FILE_PATTERN, find_unneeded_defaults_from_rule
from detection_rules.utils import load_etc_dump
from detection_rules.utils import get_path, load_etc_dump
from detection_rules.rule import Rule

from .base import BaseRuleTest
Expand Down Expand Up @@ -60,9 +61,20 @@ def test_file_names(self):

def test_all_rules_as_rule_schema(self):
"""Ensure that every rule file validates against the rule schema."""
rules_path = get_path('rules')

for file_name, contents in self.rule_files.items():
rule = Rule(file_name, contents)
rule.validate(as_rule=True)

if rule.metadata['maturity'] == 'deprecated':
continue

try:
rule.validate(as_rule=True)
except jsonschema.ValidationError as exc:
rule_path = Path(rule.path).relative_to(rules_path)
exc.message = f'{rule_path} -> {exc}'
raise exc

def test_all_rule_queries_optimized(self):
"""Ensure that every rule query is in optimized form."""
Expand Down Expand Up @@ -430,6 +442,43 @@ def test_updated_date_newer_than_creation(self):
err_msg = f'The following rules have an updated_date older than the creation_date\n {rules_str}'
self.fail(err_msg)

def test_deprecated_rules(self):
"""Test that deprecated rules are properly handled."""
versions = load_versions()
deprecations = load_etc_dump('deprecated_rules.json')
deprecated_rules = {}

for rule in self.rules:
maturity = rule.metadata['maturity']

if maturity == 'deprecated':
deprecated_rules[rule.id] = rule
err_msg = f'{self.rule_str(rule)} cannot be deprecated if it has not been version locked. ' \
f'Convert to `development` or delete the rule file instead'
self.assertIn(rule.id, versions, err_msg)

rule_path = Path(rule.path).relative_to(get_path('rules'))
err_msg = f'{self.rule_str(rule)} deprecated rules should be stored in ' \
f'"{get_path("rules", "_deprecated")}" folder'
self.assertEqual('_deprecated', rule_path.parts[0], err_msg)

err_msg = f'{self.rule_str(rule)} missing deprecation date'
self.assertIn('deprecation_date', rule.metadata, err_msg)

err_msg = f'{self.rule_str(rule)} deprecation_date and updated_date should match'
self.assertEqual(rule.metadata['deprecation_date'], rule.metadata['updated_date'], err_msg)

missing_rules = sorted(set(versions).difference(set(self.rule_lookup)))
missing_rule_strings = '\n '.join(f'{r} - {versions[r]["rule_name"]}' for r in missing_rules)
err_msg = f'Deprecated rules should not be removed, but moved to the rules/_deprecated folder instead. ' \
f'The following rules have been version locked and are missing. ' \
f'Re-add to the deprecated folder and update maturity to "deprecated": \n {missing_rule_strings}'
self.assertEqual([], missing_rules, err_msg)

for rule_id, entry in deprecations.items():
rule_str = f'{rule_id} - {entry["rule_name"]} ->'
self.assertIn(rule_id, deprecated_rules, f'{rule_str} is logged in "deprecated_rules.json" but is missing')


class TestTuleTiming(BaseRuleTest):
"""Test rule timing and timestamps."""
Expand Down