Skip to content

Mark extensions #4643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,7 @@ exports/
ML-models/
surveys/
machine-learning/


# hunting json output
hunting/*/json/*.json
28 changes: 28 additions & 0 deletions hunting/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

from detection_rules.misc import parse_user_config

from .upload import upload_data
from .definitions import HUNTING_DIR
from .markdown import MarkdownGenerator
from .json import JSONGenerator
from .run import QueryRunner
from .search import QueryIndex
from .utils import (filter_elasticsearch_params, get_hunt_path, load_all_toml,
Expand All @@ -27,6 +29,12 @@ def hunting():
"""Commands for managing hunting queries and converting TOML to Markdown."""
pass

@hunting.command('upload')
def upload():
"""Upload hunting queries to Elasticsearch."""
# This function is not implemented in the provided code.
upload_data()


@hunting.command('generate-markdown')
@click.argument('path', required=False)
Expand All @@ -51,6 +59,26 @@ def generate_markdown(path: Path = None):
# After processing, update the index
markdown_generator.update_index_md()

@hunting.command('generate-json')
@click.argument('path', required=False)
def generate_json(path: Path = None):
"""Convert TOML hunting queries to JSON format."""
json_generator = JSONGenerator(HUNTING_DIR)

if path:
path = Path(path)
if path.is_file() and path.suffix == '.toml':
click.echo(f"Generating JSON for single file: {path}")
json_generator.process_file(path)
elif (HUNTING_DIR / path).is_dir():
click.echo(f"Generating JSON for folder: {path}")
json_generator.process_folder(path)
else:
raise ValueError(f"Invalid path provided: {path}")
else:
click.echo("Generating JSON for all files.")
json_generator.process_all_files()


@hunting.command('refresh-index')
def refresh_index():
Expand Down
195 changes: 195 additions & 0 deletions hunting/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

from dataclasses import asdict
import datetime
import json
from pathlib import Path, PosixPath
import click
from .definitions import Hunt
from .utils import load_index_file, load_toml
import re

now = datetime.datetime.now()
timestamp = now.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
class JSONGenerator:
"""Class to generate or update JSON documentation from TOML or YAML files."""
def __init__(self, base_path: Path):
"""Initialize with the base path and load the hunting index."""
self.base_path = base_path
self.hunting_index = load_index_file()

def process_file(self, file_path: Path) -> None:
"""Process a single TOML file and generate its JSON representation."""
if not file_path.is_file() or file_path.suffix != '.toml':
raise ValueError(f"The provided path is not a valid TOML file: {file_path}")

click.echo(f"Processing specific TOML file: {file_path}")
hunt_config = load_toml(file_path)
json_content = self.convert_toml_to_json(hunt_config, file_path)

json_folder = self.create_json_folder(file_path)
json_path = json_folder / f"{file_path.stem}.json"
self.save_json(json_path, json_content)

def process_folder(self, folder: str) -> None:
"""Process all TOML files in a specified folder and generate their JSON representations."""
folder_path = self.base_path / folder / "queries"
json_folder = self.base_path / folder / "docs"

if not folder_path.is_dir() or not json_folder.is_dir():
raise ValueError(f"Queries folder {folder_path} or docs folder {json_folder} does not exist.")

click.echo(f"Processing all TOML files in folder: {folder_path}")
toml_files = folder_path.rglob("*.toml")

for toml_file in toml_files:
self.process_file(toml_file)

def process_all_files(self) -> None:
"""Process all TOML files in the base directory and subfolders."""
click.echo("Processing all TOML files in the base directory and subfolders.")
toml_files = self.base_path.rglob("queries/*.toml")

for toml_file in toml_files:
self.process_file(toml_file)

def convert_toml_to_json(self, hunt_config: Hunt, path: str) -> str:
"""Convert a Hunt configuration to JSON format."""
hunt_config_dict = asdict(hunt_config)
hunt_config_dict["queries"] = self.format_queries(hunt_config_dict["query"])
hunt_config_dict.pop("query")
hunt_config_dict["category"] = self.path_to_category(path)
hunt_config_dict["@timestamp"] = timestamp
return json.dumps(hunt_config_dict, indent=4)

def path_to_category(self, path: PosixPath) -> str:
"""
Convert a file path to a category string.

Args:
path (str): The file path.

Returns:
str: The category string derived from the file path.
"""
# category is the direcory the queries are in
# e.g. "hunting/winodws/queries" -> "windows"

# Get the path parts
parts = path.parts
# Check if the last part is "queries"
if "queries" in parts:
# Get the index of "queries" in the path
queries_index = parts.index("queries")
# If "queries" exists and there's a part before it, return that as the category
if queries_index > 0:
return parts[queries_index - 1]

# Default fallback: return the parent directory name
return path.parent.name


@staticmethod
def extract_indices_from_esql(esql_query):
"""
Extract indices from an ESQL query.

Args:
esql_query (str): The ESQL query.

Returns:
list: A list of indices found in the query.
"""
# Handle SELECT statements that start with SELECT instead of FROM
if esql_query.strip().upper().startswith('SELECT'):
# Find the FROM keyword after SELECT
match = re.search(r'FROM\s+([^\s|,;\n]+)', esql_query, re.IGNORECASE)
if match:
return [match.group(1).strip()]

# For queries that start with FROM directly
# Normalize whitespace by removing extra spaces and newlines
normalized_query = ' '.join(esql_query.split())

# Check if the query starts with "from"
if not normalized_query.lower().startswith('from '):
return []

# Extract the part after "from" and before the first pipe (|)
# First remove any inline comments with //
cleaned_query = re.sub(r'//.*$', '', normalized_query, flags=re.MULTILINE)
# Extract text after "from" keyword, then split by pipe, newline, or WHERE
from_part = cleaned_query[5:] # Skip the "from" prefix
# Find the first occurrence of pipe, newline, or "WHERE" (case insensitive)
pipe_pos = from_part.find('|')
newline_pos = from_part.find('\n')
where_pos = re.search(r'WHERE', from_part, re.IGNORECASE)
where_pos = where_pos.start() if where_pos else -1

# Find the earliest delimiter (pipe, newline, or WHERE)
positions = [pos for pos in [pipe_pos, newline_pos, where_pos] if pos >= 0]
end_pos = min(positions) if positions else len(from_part)

from_part = from_part[:end_pos].strip()

# Split by commas if multiple indices are provided
indices = [index.strip() for index in from_part.split(',')]

return indices

def remove_comments_and_blank_lines(self, esql_query):
"""
Remove comments and blank lines from an ESQL query.

Args:
esql_query (str): The ESQL query.

Returns:
str: The cleaned ESQL query.
"""
# Remove block comments (/* ... */)
cleaned_query = re.sub(r'/\*.*?\*/', '', esql_query, flags=re.DOTALL)

# Remove line comments and blank lines
result = []
for line in cleaned_query.splitlines():
# Skip comment lines and blank lines
if not line.strip().startswith("//") and line.strip():
result.append(line)

return "\n".join(result)

def format_queries(self, queries: list[str]) -> list[dict]:
"""
Format the queries for JSON output.

Args:
queries (list[str]): List of ESQL queries.
Returns:
list[dict]: List of dictionaries containing the query and its indices.
"""
formatted_queries = []

for query in queries:
formatted_queries.append({
"query": query,
"indices": self.extract_indices_from_esql(query),
"cleaned_query": self.remove_comments_and_blank_lines(query)
})

return formatted_queries

def save_json(self, json_path: Path, content: str) -> None:
"""Save the JSON content to a file."""
with open(json_path, 'w', encoding='utf-8') as f:
f.write(content)
click.echo(f"JSON generated: {json_path}")

def create_json_folder(self, file_path: Path) -> Path:
"""Create the docs folder if it doesn't exist and return the path."""
json_folder = file_path.parent.parent / "json"
json_folder.mkdir(parents=True, exist_ok=True)
return json_folder
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ from logs-okta.system*
// Count the number of reset password attempts for each user
| stats
user_count = count_distinct(user.target.full_name),
reset_counts = by okta.actor.alternate_id, source.user.full_name, okta.debug_context.debug_data.dt_hash
reset_counts = count(*) by okta.actor.alternate_id, source.user.full_name, okta.debug_context.debug_data.dt_hash

// Filter for more than 10 unique users and more than 15 reset password attempts by the source
| where user_count > 10 and reset_counts > 15
Expand Down
Loading
Loading