diff --git a/detection_rules/eswrap.py b/detection_rules/eswrap.py index fcaa4ea7b29..242a995f6c6 100644 --- a/detection_rules/eswrap.py +++ b/detection_rules/eswrap.py @@ -12,7 +12,7 @@ from kibana import Kibana, RuleResource from .main import root -from .misc import set_param_values +from .misc import getdefault from .utils import normalize_timing_and_sort, unix_time_to_formatted, get_path from .rule_loader import get_rule, rta_mappings, load_rule_files, load_rules @@ -179,10 +179,10 @@ def run(self, agent_hostname, indexes, verbose=True, **match): @es_group.command('collect-events') @click.argument('agent-hostname') -@click.option('--elasticsearch-url', '-u', callback=set_param_values, expose_value=True) -@click.option('--cloud-id', callback=set_param_values, expose_value=True) -@click.option('--user', '-u', callback=set_param_values, expose_value=True, hide_input=False) -@click.option('--password', '-p', callback=set_param_values, expose_value=True, hide_input=True) +@click.option('--elasticsearch-url', '-u', default=getdefault("elasticsearch_url")) +@click.option('--cloud-id', default=getdefault("cloud_id")) +@click.option('--user', '-u', default=getdefault("user")) +@click.option('--password', '-p', default=getdefault("password")) @click.option('--index', '-i', multiple=True, help='Index(es) to search against (default: all indexes)') @click.option('--agent-type', '-a', help='Restrict results to a source type (agent.type) ex: auditbeat') @click.option('--rta-name', '-r', help='Name of RTA in order to save events directly to unit tests data directory') @@ -193,6 +193,13 @@ def collect_events(agent_hostname, elasticsearch_url, cloud_id, user, password, """Collect events from Elasticsearch.""" match = {'agent.type': agent_type} if agent_type else {} + if not cloud_id or elasticsearch_url: + raise click.ClickException("Missing required --cloud-id or --elasticsearch-url") + + # don't prompt for these until there's a cloud id or elasticsearch URL + user = user or click.prompt("user") + password = password or click.prompt("password", hide_input=True) + try: client = get_es_client(elasticsearch_url=elasticsearch_url, use_ssl=True, cloud_id=cloud_id, user=user, password=password) @@ -229,16 +236,23 @@ def normalize_file(events_file): @root.command("kibana-upload") @click.argument("toml-files", nargs=-1, required=True) -@click.option('--kibana-url', '-u', callback=set_param_values, expose_value=True) -@click.option('--cloud-id', callback=set_param_values, expose_value=True) -@click.option('--user', '-u', callback=set_param_values, expose_value=True, hide_input=False) -@click.option('--password', '-p', callback=set_param_values, expose_value=True, hide_input=True) +@click.option('--kibana-url', '-u', default=getdefault("kibana_url")) +@click.option('--cloud-id', default=getdefault("cloud_id")) +@click.option('--user', '-u', default=getdefault("user")) +@click.option('--password', '-p', default=getdefault("password")) def kibana_upload(toml_files, kibana_url, cloud_id, user, password): """Upload a list of rule .toml files to Kibana.""" from uuid import uuid4 from .packaging import manage_versions from .schemas import downgrade + if not cloud_id or kibana_url: + raise click.ClickException("Missing required --cloud-id or --kibana-url") + + # don't prompt for these until there's a cloud id or kibana URL + user = user or click.prompt("user") + password = password or click.prompt("password", hide_input=True) + with Kibana(cloud_id=cloud_id, url=kibana_url) as kibana: kibana.login(user, password) diff --git a/detection_rules/misc.py b/detection_rules/misc.py index d3a7f3af901..7afe3474991 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -12,7 +12,7 @@ import click import requests -from .utils import ROOT_DIR +from .utils import cached, get_path _CONFIG = {} @@ -200,35 +200,23 @@ def download_worker(rule_info): return kibana_rules +@cached def parse_config(): """Parse a default config file.""" - global _CONFIG + config_file = get_path('.detection-rules-cfg.json') + config = {} - if not _CONFIG: - config_file = os.path.join(ROOT_DIR, '.detection-rules-cfg.json') + if os.path.exists(config_file): + with open(config_file) as f: + config = json.load(f) - if os.path.exists(config_file): - with open(config_file) as f: - _CONFIG = json.load(f) + click.secho('Loaded config file: {}'.format(config_file), fg='yellow') - click.secho('Loaded config file: {}'.format(config_file), fg='yellow') + return config - return _CONFIG - -def set_param_values(ctx, param, value): - """Get value for defined key.""" - key = param.name +def getdefault(name): + """Callback function for `default` to get an environment variable.""" + envvar = f"DR_{name.upper()}" config = parse_config() - env_key = 'DR_' + key.upper() - prompt = True if param.hide_input is not False else False - - if value: - return value - elif os.environ.get(env_key): - return os.environ[env_key] - elif config.get(key) is not None: - return config[key] - elif prompt: - return click.prompt(key, default=param.default if not param.default else None, hide_input=param.hide_input, - show_default=True if param.default else False) + return lambda: os.environ.get(envvar, config.get(name))