diff --git a/test_elasticsearch/utils.py b/test_elasticsearch/utils.py index 8a13ff62f..021deb76e 100644 --- a/test_elasticsearch/utils.py +++ b/test_elasticsearch/utils.py @@ -22,10 +22,8 @@ from typing import Optional, Tuple from elasticsearch import ( - AuthorizationException, ConnectionError, Elasticsearch, - NotFoundError, ) SOURCE_DIR = Path(__file__).absolute().parent.parent @@ -118,40 +116,15 @@ def wipe_cluster(client): except ImportError: pass - is_xpack = True - if is_xpack: - wipe_rollup_jobs(client) - wait_for_pending_tasks(client, filter="xpack/rollup/job") - wipe_slm_policies(client) - - # Searchable snapshot indices start in 7.8+ - if es_version(client) >= (7, 8): - wipe_searchable_snapshot_indices(client) - wipe_snapshots(client) - if is_xpack: - wipe_data_streams(client) + wipe_data_streams(client) wipe_indices(client) - if is_xpack: - wipe_xpack_templates(client) - else: - client.indices.delete_template(name="*") - client.indices.delete_index_template(name="*") - client.cluster.delete_component_template(name="*") + client.indices.delete_template(name="*") + client.indices.delete_index_template(name="*") wipe_cluster_settings(client) - if is_xpack: - wipe_ilm_policies(client) - wipe_auto_follow_patterns(client) - wipe_tasks(client) - wipe_node_shutdown_metadata(client) - wait_for_pending_datafeeds_and_jobs(client) - wipe_calendars(client) - wipe_filters(client) - wipe_transforms(client) - wait_for_cluster_state_updates_to_finish(client) if close_after_wipe: client.close() @@ -169,16 +142,6 @@ def wipe_cluster_settings(client): client.cluster.put_settings(body=new_settings) -def wipe_rollup_jobs(client): - rollup_jobs = client.rollup.get_jobs(id="_all").get("jobs", ()) - for job in rollup_jobs: - job_id = job["config"]["id"] - client.options(ignore_status=404).rollup.stop_job( - id=job_id, wait_for_completion=True - ) - client.options(ignore_status=404).rollup.delete_job(id=job_id) - - def wipe_snapshots(client): """Deletes all the snapshots and repositories from the cluster""" in_progress_snapshots = [] @@ -223,259 +186,8 @@ def wipe_indices(client): ) -def wipe_searchable_snapshot_indices(client): - cluster_metadata = client.cluster.state( - metric="metadata", - filter_path="metadata.indices.*.settings.index.store.snapshot", - ) - if cluster_metadata: - for index in cluster_metadata["metadata"]["indices"].keys(): - client.indices.delete(index=index) - - -def wipe_xpack_templates(client): - # Delete index templates (including legacy) - templates = [ - x.strip() for x in client.cat.templates(h="name").split("\n") if x.strip() - ] - for template in templates: - if is_xpack_template(template): - continue - try: - client.indices.delete_template(name=template) - except NotFoundError as e: - if f"index_template [{template}] missing" in str(e): - client.indices.delete_index_template(name=template) - - # Delete component templates - templates = client.cluster.get_component_template()["component_templates"] - templates_to_delete = [ - template["name"] - for template in templates - if not is_xpack_template(template["name"]) - ] - if templates_to_delete: - client.cluster.delete_component_template(name=",".join(templates_to_delete)) - - -def wipe_ilm_policies(client): - for policy in client.ilm.get_lifecycle(): - if ( - policy - not in { - "ilm-history-ilm-policy", - "slm-history-ilm-policy", - "watch-history-ilm-policy", - "watch-history-ilm-policy-16", - "ml-size-based-ilm-policy", - "logs", - "metrics", - "synthetics", - "7-days-default", - "30-days-default", - "90-days-default", - "180-days-default", - "365-days-default", - ".fleet-actions-results-ilm-policy", - ".deprecation-indexing-ilm-policy", - ".monitoring-8-ilm-policy", - } - and "-history-ilm-polcy" not in policy - and "-meta-ilm-policy" not in policy - and "-data-ilm-policy" not in policy - and "@lifecycle" not in policy - ): - client.ilm.delete_lifecycle(name=policy) - - -def wipe_slm_policies(client): - policies = client.slm.get_lifecycle() - for policy in policies: - if policy not in {"cloud-snapshot-policy"}: - client.slm.delete_lifecycle(policy_id=policy) - - -def wipe_auto_follow_patterns(client): - for pattern in client.ccr.get_auto_follow_pattern()["patterns"]: - client.ccr.delete_auto_follow_pattern(name=pattern["name"]) - - -def wipe_node_shutdown_metadata(client): - try: - shutdown_status = client.shutdown.get_node() - # If response contains these two keys the feature flag isn't enabled - # on this cluster so skip this step now. - if "_nodes" in shutdown_status and "cluster_name" in shutdown_status: - return - - for shutdown_node in shutdown_status.get("nodes", []): - node_id = shutdown_node["node_id"] - client.shutdown.delete_node(node_id=node_id) - - # Elastic Cloud doesn't allow this so we skip. - except AuthorizationException: - pass - - -def wipe_tasks(client): - tasks = client.tasks.list() - for node_name, node in tasks.get("node", {}).items(): - for task_id in node.get("tasks", ()): - client.tasks.cancel(task_id=task_id, wait_for_completion=True) - - -def wait_for_pending_tasks(client, filter, timeout=30): - end_time = time.time() + timeout - while time.time() < end_time: - tasks = client.cat.tasks(detailed=True).split("\n") - if not any(filter in task for task in tasks): - break - - -def wait_for_pending_datafeeds_and_jobs(client: Elasticsearch, timeout=30): - end_time = time.time() + timeout - while time.time() < end_time: - resp = client.ml.get_datafeeds(datafeed_id="*", allow_no_match=True) - if resp["count"] == 0: - break - for datafeed in resp["datafeeds"]: - client.options(ignore_status=404).ml.delete_datafeed( - datafeed_id=datafeed["datafeed_id"] - ) - - end_time = time.time() + timeout - while time.time() < end_time: - resp = client.ml.get_jobs(job_id="*", allow_no_match=True) - if resp["count"] == 0: - break - for job in resp["jobs"]: - client.options(ignore_status=404).ml.close_job(job_id=job["job_id"]) - client.options(ignore_status=404).ml.delete_job(job_id=job["job_id"]) - - end_time = time.time() + timeout - while time.time() < end_time: - resp = client.ml.get_data_frame_analytics(id="*") - if resp["count"] == 0: - break - for job in resp["data_frame_analytics"]: - client.options(ignore_status=404).ml.stop_data_frame_analytics(id=job["id"]) - client.options(ignore_status=404).ml.delete_data_frame_analytics( - id=job["id"] - ) - - -def wipe_filters(client: Elasticsearch, timeout=30): - end_time = time.time() + timeout - while time.time() < end_time: - resp = client.ml.get_filters(filter_id="*") - if resp["count"] == 0: - break - for filter in resp["filters"]: - client.options(ignore_status=404).ml.delete_filter( - filter_id=filter["filter_id"] - ) - - -def wipe_calendars(client: Elasticsearch, timeout=30): - end_time = time.time() + timeout - while time.time() < end_time: - resp = client.ml.get_calendars(calendar_id="*") - if resp["count"] == 0: - break - for calendar in resp["calendars"]: - client.options(ignore_status=404).ml.delete_calendar( - calendar_id=calendar["calendar_id"] - ) - - -def wipe_transforms(client: Elasticsearch, timeout=30): - end_time = time.time() + timeout - while time.time() < end_time: - resp = client.transform.get_transform(transform_id="*") - if resp["count"] == 0: - break - for trasnform in resp["transforms"]: - client.options(ignore_status=404).transform.stop_transform( - transform_id=trasnform["id"] - ) - client.options(ignore_status=404).transform.delete_transform( - transform_id=trasnform["id"] - ) - - def wait_for_cluster_state_updates_to_finish(client, timeout=30): end_time = time.time() + timeout while time.time() < end_time: if not client.cluster.pending_tasks().get("tasks", ()): break - - -def is_xpack_template(name): - if name.startswith("."): - return True - elif name.startswith("behavioral_analytics-events"): - return True - elif name.startswith("elastic-connectors-"): - return True - elif name.startswith("entities_v1_"): - return True - elif name.endswith("@ilm"): - return True - elif name.endswith("@template"): - return True - - return name in { - "agentless", - "agentless@mappings", - "agentless@settings", - "apm-10d@lifecycle", - "apm-180d@lifecycle", - "apm-390d@lifecycle", - "apm-90d@lifecycle", - "apm@mappings", - "apm@settings", - "data-streams-mappings", - "data-streams@mappings", - "elastic-connectors", - "ecs@dynamic_templates", - "ecs@mappings", - "ilm-history-7", - "kibana-reporting@settings", - "logs", - "logs-apm.error@mappings", - "logs-apm@settings", - "logs-mappings", - "logs@mappings", - "logs-settings", - "logs@settings", - "metrics", - "metrics-apm@mappings", - "metrics-apm.service_destination@mappings", - "metrics-apm.service_summary@mappings", - "metrics-apm.service_transaction@mappings", - "metrics-apm@settings", - "metrics-apm.transaction@mappings", - "metrics-mappings", - "metrics@mappings", - "metrics-settings", - "metrics@settings", - "metrics-tsdb-settings", - "metrics@tsdb-settings", - "search-acl-filter", - "synthetics", - "synthetics-mappings", - "synthetics@mappings", - "synthetics-settings", - "synthetics@settings", - "traces-apm@mappings", - "traces-apm.rum@mappings", - "traces@mappings", - "traces@settings", - # otel - "metrics-otel@mappings", - "semconv-resource-to-ecs@mappings", - "traces-otel@mappings", - "ecs-tsdb@mappings", - "logs-otel@mappings", - "otel@mappings", - }