diff --git a/.gitignore b/.gitignore index 2a9bf333790..fe3182a519b 100644 --- a/.gitignore +++ b/.gitignore @@ -115,3 +115,7 @@ exports/ ML-models/ surveys/ machine-learning/ + + +# hunting json output +hunting/*/json/*.json \ No newline at end of file diff --git a/hunting/__main__.py b/hunting/__main__.py index 4ce320566a0..46354adcffa 100644 --- a/hunting/__main__.py +++ b/hunting/__main__.py @@ -14,8 +14,10 @@ from detection_rules.misc import parse_user_config +from .upload import upload_data from .definitions import HUNTING_DIR from .markdown import MarkdownGenerator +from .json import JSONGenerator from .run import QueryRunner from .search import QueryIndex from .utils import (filter_elasticsearch_params, get_hunt_path, load_all_toml, @@ -27,6 +29,12 @@ def hunting(): """Commands for managing hunting queries and converting TOML to Markdown.""" pass +@hunting.command('upload') +def upload(): + """Upload hunting queries to Elasticsearch.""" + # This function is not implemented in the provided code. + upload_data() + @hunting.command('generate-markdown') @click.argument('path', required=False) @@ -51,6 +59,26 @@ def generate_markdown(path: Path = None): # After processing, update the index markdown_generator.update_index_md() +@hunting.command('generate-json') +@click.argument('path', required=False) +def generate_json(path: Path = None): + """Convert TOML hunting queries to JSON format.""" + json_generator = JSONGenerator(HUNTING_DIR) + + if path: + path = Path(path) + if path.is_file() and path.suffix == '.toml': + click.echo(f"Generating JSON for single file: {path}") + json_generator.process_file(path) + elif (HUNTING_DIR / path).is_dir(): + click.echo(f"Generating JSON for folder: {path}") + json_generator.process_folder(path) + else: + raise ValueError(f"Invalid path provided: {path}") + else: + click.echo("Generating JSON for all files.") + json_generator.process_all_files() + @hunting.command('refresh-index') def refresh_index(): diff --git a/hunting/json.py b/hunting/json.py new file mode 100644 index 00000000000..ae125be9c4a --- /dev/null +++ b/hunting/json.py @@ -0,0 +1,195 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +from dataclasses import asdict +import datetime +import json +from pathlib import Path, PosixPath +import click +from .definitions import Hunt +from .utils import load_index_file, load_toml +import re + +now = datetime.datetime.now() +timestamp = now.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" +class JSONGenerator: + """Class to generate or update JSON documentation from TOML or YAML files.""" + def __init__(self, base_path: Path): + """Initialize with the base path and load the hunting index.""" + self.base_path = base_path + self.hunting_index = load_index_file() + + def process_file(self, file_path: Path) -> None: + """Process a single TOML file and generate its JSON representation.""" + if not file_path.is_file() or file_path.suffix != '.toml': + raise ValueError(f"The provided path is not a valid TOML file: {file_path}") + + click.echo(f"Processing specific TOML file: {file_path}") + hunt_config = load_toml(file_path) + json_content = self.convert_toml_to_json(hunt_config, file_path) + + json_folder = self.create_json_folder(file_path) + json_path = json_folder / f"{file_path.stem}.json" + self.save_json(json_path, json_content) + + def process_folder(self, folder: str) -> None: + """Process all TOML files in a specified folder and generate their JSON representations.""" + folder_path = self.base_path / folder / "queries" + json_folder = self.base_path / folder / "docs" + + if not folder_path.is_dir() or not json_folder.is_dir(): + raise ValueError(f"Queries folder {folder_path} or docs folder {json_folder} does not exist.") + + click.echo(f"Processing all TOML files in folder: {folder_path}") + toml_files = folder_path.rglob("*.toml") + + for toml_file in toml_files: + self.process_file(toml_file) + + def process_all_files(self) -> None: + """Process all TOML files in the base directory and subfolders.""" + click.echo("Processing all TOML files in the base directory and subfolders.") + toml_files = self.base_path.rglob("queries/*.toml") + + for toml_file in toml_files: + self.process_file(toml_file) + + def convert_toml_to_json(self, hunt_config: Hunt, path: str) -> str: + """Convert a Hunt configuration to JSON format.""" + hunt_config_dict = asdict(hunt_config) + hunt_config_dict["queries"] = self.format_queries(hunt_config_dict["query"]) + hunt_config_dict.pop("query") + hunt_config_dict["category"] = self.path_to_category(path) + hunt_config_dict["@timestamp"] = timestamp + return json.dumps(hunt_config_dict, indent=4) + + def path_to_category(self, path: PosixPath) -> str: + """ + Convert a file path to a category string. + + Args: + path (str): The file path. + + Returns: + str: The category string derived from the file path. + """ + # category is the direcory the queries are in + # e.g. "hunting/winodws/queries" -> "windows" + + # Get the path parts + parts = path.parts + # Check if the last part is "queries" + if "queries" in parts: + # Get the index of "queries" in the path + queries_index = parts.index("queries") + # If "queries" exists and there's a part before it, return that as the category + if queries_index > 0: + return parts[queries_index - 1] + + # Default fallback: return the parent directory name + return path.parent.name + + + @staticmethod + def extract_indices_from_esql(esql_query): + """ + Extract indices from an ESQL query. + + Args: + esql_query (str): The ESQL query. + + Returns: + list: A list of indices found in the query. + """ + # Handle SELECT statements that start with SELECT instead of FROM + if esql_query.strip().upper().startswith('SELECT'): + # Find the FROM keyword after SELECT + match = re.search(r'FROM\s+([^\s|,;\n]+)', esql_query, re.IGNORECASE) + if match: + return [match.group(1).strip()] + + # For queries that start with FROM directly + # Normalize whitespace by removing extra spaces and newlines + normalized_query = ' '.join(esql_query.split()) + + # Check if the query starts with "from" + if not normalized_query.lower().startswith('from '): + return [] + + # Extract the part after "from" and before the first pipe (|) + # First remove any inline comments with // + cleaned_query = re.sub(r'//.*$', '', normalized_query, flags=re.MULTILINE) + # Extract text after "from" keyword, then split by pipe, newline, or WHERE + from_part = cleaned_query[5:] # Skip the "from" prefix + # Find the first occurrence of pipe, newline, or "WHERE" (case insensitive) + pipe_pos = from_part.find('|') + newline_pos = from_part.find('\n') + where_pos = re.search(r'WHERE', from_part, re.IGNORECASE) + where_pos = where_pos.start() if where_pos else -1 + + # Find the earliest delimiter (pipe, newline, or WHERE) + positions = [pos for pos in [pipe_pos, newline_pos, where_pos] if pos >= 0] + end_pos = min(positions) if positions else len(from_part) + + from_part = from_part[:end_pos].strip() + + # Split by commas if multiple indices are provided + indices = [index.strip() for index in from_part.split(',')] + + return indices + + def remove_comments_and_blank_lines(self, esql_query): + """ + Remove comments and blank lines from an ESQL query. + + Args: + esql_query (str): The ESQL query. + + Returns: + str: The cleaned ESQL query. + """ + # Remove block comments (/* ... */) + cleaned_query = re.sub(r'/\*.*?\*/', '', esql_query, flags=re.DOTALL) + + # Remove line comments and blank lines + result = [] + for line in cleaned_query.splitlines(): + # Skip comment lines and blank lines + if not line.strip().startswith("//") and line.strip(): + result.append(line) + + return "\n".join(result) + + def format_queries(self, queries: list[str]) -> list[dict]: + """ + Format the queries for JSON output. + + Args: + queries (list[str]): List of ESQL queries. + Returns: + list[dict]: List of dictionaries containing the query and its indices. + """ + formatted_queries = [] + + for query in queries: + formatted_queries.append({ + "query": query, + "indices": self.extract_indices_from_esql(query), + "cleaned_query": self.remove_comments_and_blank_lines(query) + }) + + return formatted_queries + + def save_json(self, json_path: Path, content: str) -> None: + """Save the JSON content to a file.""" + with open(json_path, 'w', encoding='utf-8') as f: + f.write(content) + click.echo(f"JSON generated: {json_path}") + + def create_json_folder(self, file_path: Path) -> Path: + """Create the docs folder if it doesn't exist and return the path.""" + json_folder = file_path.parent.parent / "json" + json_folder.mkdir(parents=True, exist_ok=True) + return json_folder diff --git a/hunting/okta/queries/credential_access_rapid_reset_password_requests_for_different_users.toml b/hunting/okta/queries/credential_access_rapid_reset_password_requests_for_different_users.toml index f74cf1453ac..96f65e452d4 100644 --- a/hunting/okta/queries/credential_access_rapid_reset_password_requests_for_different_users.toml +++ b/hunting/okta/queries/credential_access_rapid_reset_password_requests_for_different_users.toml @@ -27,7 +27,7 @@ from logs-okta.system* // Count the number of reset password attempts for each user | stats user_count = count_distinct(user.target.full_name), - reset_counts = by okta.actor.alternate_id, source.user.full_name, okta.debug_context.debug_data.dt_hash + reset_counts = count(*) by okta.actor.alternate_id, source.user.full_name, okta.debug_context.debug_data.dt_hash // Filter for more than 10 unique users and more than 15 reset password attempts by the source | where user_count > 10 and reset_counts > 15 diff --git a/hunting/upload.py b/hunting/upload.py new file mode 100644 index 00000000000..de787a53e70 --- /dev/null +++ b/hunting/upload.py @@ -0,0 +1,266 @@ +import os +import json +import sys +from elasticsearch import Elasticsearch, helpers +from urllib.parse import urlparse + +# Configuration variables (modify these as needed) +ELASTICSEARCH_URL = "http://localhost:9200" # Your Elasticsearch URL +ELASTICSEARCH_USERNAME = "elastic" # Your Elasticsearch username +ELASTICSEARCH_PASSWORD = "changeme" # Your Elasticsearch password +ELASTICSEARCH_INDEX = "threat-hunting-queries" # Target index name +# Directory containing JSON files +DIRECTORY_PATH = "/Users/mark/dev/detection-rules/hunting" +MAPPING = { + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "category": { + "type": "keyword" + }, + "author": { + "type": "keyword" + }, + "description": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "integration": { + "type": "keyword" + }, + "uuid": { + "type": "keyword" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "language": { + "type": "keyword" + }, + "license": { + "type": "keyword" + }, + "notes": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "mitre": { + "type": "keyword" + }, + "references": { + "type": "keyword" + }, + "queries": { + "properties": { + "query": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 8192 + } + } + }, + "indices": { + "type": "keyword" + }, + "cleaned_query": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 8192 + } + } + }, + } + } + } + } +} + + +def find_json_files(directory): + """Recursively find all JSON files in the directory.""" + json_files = [] + for root, _, files in os.walk(directory): + for file in files: + if file.lower().endswith('.json'): + json_files.append(os.path.join(root, file)) + return json_files + + +def read_json_file(file_path): + """Read a JSON file and return its contents.""" + try: + with open(file_path, 'r', encoding='utf-8') as file: + return json.load(file) + except Exception as e: + print(f"Error reading file {file_path}: {e}") + return None + + +def validate_language(item): + if not isinstance(item, dict): + return False + if "language" not in item: + return True # No language field to validate + languages = item["language"] + if not isinstance(languages, list): + return False + + for lang in languages: + if lang.lower() != "es|ql": + return False + return True + + + +def generate_actions(json_files, index_name): + """Generate actions for the bulk API.""" + for file_path in json_files: + data = read_json_file(file_path) + if data: + # Handle both single documents and arrays of documents + if isinstance(data, list): + for item in data: + # Validate the language field + if not validate_language(item): + print(f"Invalid language field in file: {file_path}") + continue + else: + yield { + "_index": index_name, + "_source": item + } + elif isinstance(data, dict): + if not validate_language(data): + print(f"Invalid language field in file: {file_path}") + continue + else: + yield { + "_index": index_name, + "_source": data + } + +def create_index_with_mapping(es_client): + """ + Create an Elasticsearch index with the specified mapping. + If the index already exists, it can optionally be deleted and recreated. + + Args: + es_client: Elasticsearch client instance + index_name (str): Name of the index to create + mapping (dict, optional): The mapping configuration. If None, a default mapping will be used. + You can replace this with your custom mapping. + + Returns: + bool: True if the index was created successfully, False otherwise + """ + try: + if es_client.indices.exists(index=ELASTICSEARCH_INDEX): + print(f"Index '{ELASTICSEARCH_INDEX}' already exists.") + return True + + # Create the index with the mapping + print(f"Creating index '{ELASTICSEARCH_INDEX}' with custom mapping...") + es_client.indices.create(index=ELASTICSEARCH_INDEX, body=MAPPING) + print(f"Index '{ELASTICSEARCH_INDEX}' created successfully.") + return True + + except Exception as e: + print(f"Error creating index with mapping: {e}") + return False + + +def upload_data(): + # Validate configuration + if not os.path.isdir(DIRECTORY_PATH): + print(f"Error: Directory '{DIRECTORY_PATH}' does not exist.") + sys.exit(1) + + # Parse URL to ensure it's valid + try: + parsed_url = urlparse(ELASTICSEARCH_URL) + if not parsed_url.scheme or not parsed_url.netloc: + raise ValueError("Invalid URL format") + except Exception as e: + print(f"Error: Invalid Elasticsearch URL: {e}") + sys.exit(1) + + # Find all JSON files + json_files = find_json_files(DIRECTORY_PATH) + if not json_files: + print(f"No JSON files found in '{DIRECTORY_PATH}'.") + sys.exit(0) + + print(f"Found {len(json_files)} JSON files to upload.") + + # Connect to Elasticsearch + try: + es = Elasticsearch( + ELASTICSEARCH_URL, + basic_auth=(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) + ) + + # Check if Elasticsearch is available + if not es.ping(): + raise ConnectionError("Could not connect to Elasticsearch") + + except Exception as e: + print(f"Error connecting to Elasticsearch: {e}") + sys.exit(1) + + # Create index with mapping + try: + create_index_with_mapping(es) + except Exception as e: + print(f"Error creating index with mapping: {e}") + sys.exit(1) + + # Upload documents using bulk API + try: + success, failed = 0, 0 + actions = generate_actions(json_files, ELASTICSEARCH_INDEX) + + for ok, result in helpers.streaming_bulk( + es, + actions, + max_retries=3, + yield_ok=True + ): + if ok: + success += 1 + else: + print(f"Error: {result['index']['error']}") + failed += 1 + + # Print progress every 100 documents + if (success + failed) % 100 == 0: + print(f"Progress: {success} succeeded, {failed} failed") + + + print( + f"Upload complete: {success} documents uploaded successfully, {failed} documents failed.") + + except Exception as e: + print(f"Error during bulk upload: {e}") + sys.exit(1) diff --git a/pyproject.toml b/pyproject.toml index 86441ff8818..49958ed78fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.0.5" +version = "1.0.6" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12"