diff --git a/composer/workflows/airflow_db_cleanup.py b/composer/workflows/airflow_db_cleanup.py index 5eb84f95183..d277d5ec378 100644 --- a/composer/workflows/airflow_db_cleanup.py +++ b/composer/workflows/airflow_db_cleanup.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Note: This sample is designed for Airflow 1 and 2. + # [START composer_metadb_cleanup] -""" -A maintenance workflow that you can deploy into Airflow to periodically clean +"""A maintenance workflow that you can deploy into Airflow to periodically clean out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid having too much data in your Airflow MetaStore. @@ -68,33 +69,60 @@ from sqlalchemy import desc, sql, text from sqlalchemy.exc import ProgrammingError + +def parse_airflow_version(version: str) -> tuple[int]: + # TODO(developer): Update this function if you are using a version + # with non-numerical characters such as "2.9.3rc1". + COMPOSER_SUFFIX = "+composer" + if version.endswith(COMPOSER_SUFFIX): + airflow_version_without_suffix = version[:-len(COMPOSER_SUFFIX)] + else: + airflow_version_without_suffix = version + airflow_version_str = airflow_version_without_suffix.split(".") + + return tuple([int(s) for s in airflow_version_str]) + + now = timezone.utcnow # airflow-db-cleanup DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") + START_DATE = airflow.utils.dates.days_ago(1) -# How often to Run. @daily - Once a day at Midnight (UTC) + +# How often to Run. @daily - Once a day at Midnight (UTC). SCHEDULE_INTERVAL = "@daily" -# Who is listed as the owner of this DAG in the Airflow Web Server + +# Who is listed as the owner of this DAG in the Airflow Web Server. DAG_OWNER_NAME = "operations" -# List of email address to send email alerts to if this job fails + +# List of email address to send email alerts to if this job fails. ALERT_EMAIL_ADDRESSES = [] -# Airflow version used by the environment in list form, value stored in -# airflow_version is in format e.g "2.3.4+composer" -AIRFLOW_VERSION = airflow_version[: -len("+composer")].split(".") -# Length to retain the log files if not already provided in the conf. If this -# is set to 30, the job will remove those files that arE 30 days old or older. + +# Airflow version used by the environment as a tuple of integers. +# For example: (2, 9, 2) +# +# Value in `airflow_version` is in format e.g. "2.9.2+composer" +# It's converted to facilitate version comparison. +AIRFLOW_VERSION = parse_airflow_version(airflow_version) + +# Length to retain the log files if not already provided in the configuration. +# If this is set to 30, the job will remove those files +# that are 30 days old or older. DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int( Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30) ) -# Prints the database entries which will be getting deleted; set to False -# to avoid printing large lists and slowdown process + +# Prints the database entries which will be getting deleted; +# set to False to avoid printing large lists and slowdown the process. PRINT_DELETES = False -# Whether the job should delete the db entries or not. Included if you want to -# temporarily avoid deleting the db entries. + +# Whether the job should delete the DB entries or not. +# Included if you want to temporarily avoid deleting the DB entries. ENABLE_DELETE = True -# List of all the objects that will be deleted. Comment out the DB objects you -# want to skip. + +# List of all the objects that will be deleted. +# Comment out the DB objects you want to skip. DATABASE_OBJECTS = [ { "airflow_db_model": DagRun, @@ -105,9 +133,7 @@ }, { "airflow_db_model": TaskInstance, - "age_check_column": TaskInstance.start_date - if AIRFLOW_VERSION < ["2", "2", "0"] - else TaskInstance.start_date, + "age_check_column": TaskInstance.start_date, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, @@ -122,7 +148,7 @@ { "airflow_db_model": XCom, "age_check_column": XCom.execution_date - if AIRFLOW_VERSION < ["2", "2", "5"] + if AIRFLOW_VERSION < (2, 2, 5) else XCom.timestamp, "keep_last": False, "keep_last_filters": None, @@ -144,7 +170,7 @@ }, ] -# Check for TaskReschedule model +# Check for TaskReschedule model. try: from airflow.models import TaskReschedule @@ -152,7 +178,7 @@ { "airflow_db_model": TaskReschedule, "age_check_column": TaskReschedule.execution_date - if AIRFLOW_VERSION < ["2", "2", "0"] + if AIRFLOW_VERSION < (2, 2, 0) else TaskReschedule.start_date, "keep_last": False, "keep_last_filters": None, @@ -163,7 +189,7 @@ except Exception as e: logging.error(e) -# Check for TaskFail model +# Check for TaskFail model. try: from airflow.models import TaskFail @@ -180,8 +206,8 @@ except Exception as e: logging.error(e) -# Check for RenderedTaskInstanceFields model -if AIRFLOW_VERSION < ["2", "4", "0"]: +# Check for RenderedTaskInstanceFields model. +if AIRFLOW_VERSION < (2, 4, 0): try: from airflow.models import RenderedTaskInstanceFields @@ -198,7 +224,7 @@ except Exception as e: logging.error(e) -# Check for ImportError model +# Check for ImportError model. try: from airflow.models import ImportError @@ -216,7 +242,7 @@ except Exception as e: logging.error(e) -if AIRFLOW_VERSION < ["2", "6", "0"]: +if AIRFLOW_VERSION < (2, 6, 0): try: from airflow.jobs.base_job import BaseJob @@ -530,5 +556,4 @@ def analyze_db(): print_configuration.set_downstream(cleanup_op) cleanup_op.set_downstream(analyze_op) - # [END composer_metadb_cleanup] diff --git a/composer/workflows/airflow_db_cleanup_test.py b/composer/workflows/airflow_db_cleanup_test.py index 52154ea4f69..6b6cd91b411 100644 --- a/composer/workflows/airflow_db_cleanup_test.py +++ b/composer/workflows/airflow_db_cleanup_test.py @@ -15,8 +15,23 @@ import internal_unit_testing +from . import airflow_db_cleanup -def test_dag_import(airflow_database): + +def test_version_comparison(): + # b/408307862 - Validate version check logic used in the sample. + AIRFLOW_VERSION = airflow_db_cleanup.parse_airflow_version("2.10.5+composer") + + assert AIRFLOW_VERSION == (2, 10, 5) + assert AIRFLOW_VERSION > (2, 9, 1) + + AIRFLOW_VERSION = airflow_db_cleanup.parse_airflow_version("2.9.2") + + assert AIRFLOW_VERSION == (2, 9, 2) + assert AIRFLOW_VERSION < (2, 9, 3) + + +def test_dag_import(): """Test that the DAG file can be successfully imported. This tests that the DAG can be parsed, but does not run it in an Airflow diff --git a/composer/workflows/noxfile_config.py b/composer/workflows/noxfile_config.py index 67060d87051..7eeb5bb5817 100644 --- a/composer/workflows/noxfile_config.py +++ b/composer/workflows/noxfile_config.py @@ -39,7 +39,7 @@ "3.10", "3.12", "3.13", - ], # Composer w/ Airflow 2 only supports Python 3.8 + ], # Old samples are opted out of enforcing Python type hints # All new samples should feature them "enforce_type_hints": False, diff --git a/composer/workflows/requirements.txt b/composer/workflows/requirements.txt index 9aae4038150..cb473b0dfc4 100644 --- a/composer/workflows/requirements.txt +++ b/composer/workflows/requirements.txt @@ -5,5 +5,5 @@ # https://github.com/apache/airflow/blob/main/pyproject.toml apache-airflow[amazon,apache.beam,cncf.kubernetes,google,microsoft.azure,openlineage,postgres]==2.9.2 -google-cloud-dataform==0.5.9 # used in Dataform operators +google-cloud-dataform==0.5.9 # Used in Dataform operators scipy==1.14.1 \ No newline at end of file